]> pilppa.com Git - linux-2.6-omap-h63xx.git/commitdiff
[PATCH] ocfs2: dlm recovery fixes
authorKurt Hackel <kurt.hackel@oracle.com>
Mon, 6 Mar 2006 22:08:49 +0000 (14:08 -0800)
committerMark Fasheh <mark.fasheh@oracle.com>
Fri, 24 Mar 2006 22:58:25 +0000 (14:58 -0800)
when starting lock mastery (excepting the recovery lock) wait on any nodes
needing recovery. fix one instance where lock resources were left attached
to the recovery list after recovery completed.  ensure that the node_down
code is run uniformly regardless of which node found the dead node first.

Signed-off-by: Kurt Hackel <kurt.hackel@oracle.com>
Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
fs/ocfs2/dlm/dlmcommon.h
fs/ocfs2/dlm/dlmlock.c
fs/ocfs2/dlm/dlmmaster.c
fs/ocfs2/dlm/dlmrecovery.c

index 9c772583744adcdae883d5d94bad396176e92e17..a8aec9341347f18eeb77c6429e2716fb77de4755 100644 (file)
@@ -658,6 +658,7 @@ void dlm_complete_thread(struct dlm_ctxt *dlm);
 int dlm_launch_recovery_thread(struct dlm_ctxt *dlm);
 void dlm_complete_recovery_thread(struct dlm_ctxt *dlm);
 void dlm_wait_for_recovery(struct dlm_ctxt *dlm);
+void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
 int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node);
 int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout);
 
@@ -762,6 +763,11 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data);
 int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data);
 int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data);
 int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data);
+int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
+                         u8 nodenum, u8 *real_master);
+int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
+                              struct dlm_lock_resource *res, u8 *real_master);
+
 
 int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
                               struct dlm_lock_resource *res,
index 671d4ff222cc083c15aa63ed33048c899da2d26b..6fea28318d6da1b65295c3e1faec4b0406150110 100644 (file)
@@ -141,13 +141,23 @@ static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
                                          res->lockname.len)) {
                        kick_thread = 1;
                        call_ast = 1;
+               } else {
+                       mlog(0, "%s: returning DLM_NORMAL to "
+                            "node %u for reco lock\n", dlm->name,
+                            lock->ml.node);
                }
        } else {
                /* for NOQUEUE request, unless we get the
                 * lock right away, return DLM_NOTQUEUED */
-               if (flags & LKM_NOQUEUE)
+               if (flags & LKM_NOQUEUE) {
                        status = DLM_NOTQUEUED;
-               else {
+                       if (dlm_is_recovery_lock(res->lockname.name,
+                                                res->lockname.len)) {
+                               mlog(0, "%s: returning NOTQUEUED to "
+                                    "node %u for reco lock\n", dlm->name,
+                                    lock->ml.node);
+                       }
+               } else {
                        dlm_lock_get(lock);
                        list_add_tail(&lock->list, &res->blocked);
                        kick_thread = 1;
index 78ac3a00eb54235128f8fc7a162697efe5167cf8..940be4c13b1f09ff4703662f007692a6d4b81e89 100644 (file)
@@ -239,6 +239,8 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
                                       struct dlm_lock_resource *res,
                                       u8 target);
+static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
+                                      struct dlm_lock_resource *res);
 
 
 int dlm_is_host_down(int errno)
@@ -677,6 +679,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
        struct dlm_node_iter iter;
        unsigned int namelen;
        int tries = 0;
+       int bit, wait_on_recovery = 0;
 
        BUG_ON(!lockid);
 
@@ -762,6 +765,18 @@ lookup:
                dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
                set_bit(dlm->node_num, mle->maybe_map);
                list_add(&mle->list, &dlm->master_list);
+
+               /* still holding the dlm spinlock, check the recovery map
+                * to see if there are any nodes that still need to be 
+                * considered.  these will not appear in the mle nodemap
+                * but they might own this lockres.  wait on them. */
+               bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
+               if (bit < O2NM_MAX_NODES) {
+                       mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
+                            "recover before lock mastery can begin\n",
+                            dlm->name, namelen, (char *)lockid, bit);
+                       wait_on_recovery = 1;
+               }
        }
 
        /* at this point there is either a DLM_MLE_BLOCK or a
@@ -779,6 +794,39 @@ lookup:
        spin_unlock(&dlm->master_lock);
        spin_unlock(&dlm->spinlock);
 
+       while (wait_on_recovery) {
+               /* any cluster changes that occurred after dropping the
+                * dlm spinlock would be detectable be a change on the mle,
+                * so we only need to clear out the recovery map once. */
+               if (dlm_is_recovery_lock(lockid, namelen)) {
+                       mlog(ML_NOTICE, "%s: recovery map is not empty, but "
+                            "must master $RECOVERY lock now\n", dlm->name);
+                       if (!dlm_pre_master_reco_lockres(dlm, res))
+                               wait_on_recovery = 0;
+                       else {
+                               mlog(0, "%s: waiting 500ms for heartbeat state "
+                                   "change\n", dlm->name);
+                               msleep(500);
+                       }
+                       continue;
+               } 
+
+               dlm_kick_recovery_thread(dlm);
+               msleep(100);
+               dlm_wait_for_recovery(dlm);
+
+               spin_lock(&dlm->spinlock);
+               bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
+               if (bit < O2NM_MAX_NODES) {
+                       mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
+                            "recover before lock mastery can begin\n",
+                            dlm->name, namelen, (char *)lockid, bit);
+                       wait_on_recovery = 1;
+               } else
+                       wait_on_recovery = 0;
+               spin_unlock(&dlm->spinlock);
+       }
+
        /* must wait for lock to be mastered elsewhere */
        if (blocked)
                goto wait;
@@ -1835,6 +1883,61 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
        mlog(0, "finished with dlm_assert_master_worker\n");
 }
 
+/* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
+ * We cannot wait for node recovery to complete to begin mastering this
+ * lockres because this lockres is used to kick off recovery! ;-)
+ * So, do a pre-check on all living nodes to see if any of those nodes
+ * think that $RECOVERY is currently mastered by a dead node.  If so,
+ * we wait a short time to allow that node to get notified by its own
+ * heartbeat stack, then check again.  All $RECOVERY lock resources
+ * mastered by dead nodes are purged when the hearbeat callback is 
+ * fired, so we can know for sure that it is safe to continue once
+ * the node returns a live node or no node.  */
+static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
+                                      struct dlm_lock_resource *res)
+{
+       struct dlm_node_iter iter;
+       int nodenum;
+       int ret = 0;
+       u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
+
+       spin_lock(&dlm->spinlock);
+       dlm_node_iter_init(dlm->domain_map, &iter);
+       spin_unlock(&dlm->spinlock);
+
+       while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
+               /* do not send to self */
+               if (nodenum == dlm->node_num)
+                       continue;
+               ret = dlm_do_master_requery(dlm, res, nodenum, &master);
+               if (ret < 0) {
+                       mlog_errno(ret);
+                       if (!dlm_is_host_down(ret))
+                               BUG();
+                       /* host is down, so answer for that node would be
+                        * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
+               }
+
+               if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
+                       /* check to see if this master is in the recovery map */
+                       spin_lock(&dlm->spinlock);
+                       if (test_bit(master, dlm->recovery_map)) {
+                               mlog(ML_NOTICE, "%s: node %u has not seen "
+                                    "node %u go down yet, and thinks the "
+                                    "dead node is mastering the recovery "
+                                    "lock.  must wait.\n", dlm->name,
+                                    nodenum, master);
+                               ret = -EAGAIN;
+                       }
+                       spin_unlock(&dlm->spinlock);
+                       mlog(0, "%s: reco lock master is %u\n", dlm->name, 
+                            master);
+                       break;
+               }
+       }
+       return ret;
+}
+
 
 /*
  * DLM_MIGRATE_LOCKRES
index 1e232000f3f7e1826dfaecab947afb937c222bd6..36610bdf12311a54f17339e0a66586929257d038 100644 (file)
@@ -58,7 +58,7 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node);
 static int dlm_recovery_thread(void *data);
 void dlm_complete_recovery_thread(struct dlm_ctxt *dlm);
 int dlm_launch_recovery_thread(struct dlm_ctxt *dlm);
-static void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
+void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
 static int dlm_do_recovery(struct dlm_ctxt *dlm);
 
 static int dlm_pick_recovery_master(struct dlm_ctxt *dlm);
@@ -78,15 +78,9 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
                                    u8 send_to,
                                    struct dlm_lock_resource *res,
                                    int total_locks);
-static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
-                                     struct dlm_lock_resource *res,
-                                     u8 *real_master);
 static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
                                     struct dlm_lock_resource *res,
                                     struct dlm_migratable_lockres *mres);
-static int dlm_do_master_requery(struct dlm_ctxt *dlm,
-                                struct dlm_lock_resource *res,
-                                u8 nodenum, u8 *real_master);
 static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm);
 static int dlm_send_all_done_msg(struct dlm_ctxt *dlm,
                                 u8 dead_node, u8 send_to);
@@ -165,7 +159,7 @@ void dlm_dispatch_work(void *data)
  * RECOVERY THREAD
  */
 
-static void dlm_kick_recovery_thread(struct dlm_ctxt *dlm)
+void dlm_kick_recovery_thread(struct dlm_ctxt *dlm)
 {
        /* wake the recovery thread
         * this will wake the reco thread in one of three places
@@ -1316,9 +1310,8 @@ leave:
 
 
 
-static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
-                                     struct dlm_lock_resource *res,
-                                     u8 *real_master)
+int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
+                              struct dlm_lock_resource *res, u8 *real_master)
 {
        struct dlm_node_iter iter;
        int nodenum;
@@ -1360,8 +1353,10 @@ static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
                ret = dlm_do_master_requery(dlm, res, nodenum, real_master);
                if (ret < 0) {
                        mlog_errno(ret);
-                       BUG();
-                       /* TODO: need to figure a way to restart this */
+                       if (!dlm_is_host_down(ret))
+                               BUG();
+                       /* host is down, so answer for that node would be
+                        * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
                }
                if (*real_master != DLM_LOCK_RES_OWNER_UNKNOWN) {
                        mlog(0, "lock master is %u\n", *real_master);
@@ -1372,9 +1367,8 @@ static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
 }
 
 
-static int dlm_do_master_requery(struct dlm_ctxt *dlm,
-                                struct dlm_lock_resource *res,
-                                u8 nodenum, u8 *real_master)
+int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
+                         u8 nodenum, u8 *real_master)
 {
        int ret = -EINVAL;
        struct dlm_master_requery req;
@@ -1739,6 +1733,13 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
                                } else
                                        continue;
 
+                               if (!list_empty(&res->recovering)) {
+                                       mlog(0, "%s:%.*s: lockres was "
+                                            "marked RECOVERING, owner=%u\n",
+                                            dlm->name, res->lockname.len,
+                                            res->lockname.name, res->owner);
+                                       list_del_init(&res->recovering);
+                               }
                                spin_lock(&res->spinlock);
                                dlm_change_lockres_owner(dlm, res, new_master);
                                res->state &= ~DLM_LOCK_RES_RECOVERING;
@@ -2258,7 +2259,10 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
                        mlog(0, "%u not in domain/live_nodes map "
                             "so setting it in reco map manually\n",
                             br->dead_node);
-               set_bit(br->dead_node, dlm->recovery_map);
+               /* force the recovery cleanup in __dlm_hb_node_down
+                * both of these will be cleared in a moment */
+               set_bit(br->dead_node, dlm->domain_map);
+               set_bit(br->dead_node, dlm->live_nodes_map);
                __dlm_hb_node_down(dlm, br->dead_node);
        }
        spin_unlock(&dlm->spinlock);