From: MORITA Kazutaka <morita.kazut...@lab.ntt.co.jp>

Currently, sheepdog recovery code has a race problem because the worker thread
can access recovery_work members which are not expected to be read outside of
the main thread.  For example, the worker thread can read rw->oids in
do_recover_object() while the main thread reallocates it in
finish_schedule_oids().  The root cause is that the main thread passes
recovering_work to the worker thread even though the variable is not thread 
safe.

This patch splits recovery_work into four types to remove the race condition:

 - recovery_info      : This was recover_work and can be accessed only
                        in the main thread.
 - recovery_list_work : This is used for preparing object list in the worker
                        thread.
 - recovery_obj_work  : This is used for recovering objects
                        in the worker thread.
 - recovery_work      : This is a super class of recovery_list_work and
                        recovery_obj_work, and is also used for notifying
                        recovery completion in the worker thread.

I think this also improves code readability and makes it easier to extend the
current recovery code to multi-threaded in future.

Signed-off-by: MORITA Kazutaka <morita.kazut...@lab.ntt.co.jp>
---
 sheep/recovery.c |  371 +++++++++++++++++++++++++++++++++++-------------------
 1 file changed, 245 insertions(+), 126 deletions(-)

diff --git a/sheep/recovery.c b/sheep/recovery.c
index 919f597..8962742 100644
--- a/sheep/recovery.c
+++ b/sheep/recovery.c
@@ -18,21 +18,51 @@
 
 
 enum rw_state {
-       RW_INIT,
-       RW_RUN,
+       RW_PREPARE_LIST, /* the recovery thread is preparing object list */
+       RW_RECOVER_OBJ, /* the thread is recoering objects */
+       RW_NOTIFY_COMPLETION, /* the thread is notifying recovery completion */
 };
 
+/* base structure for the recovery thread */
 struct recovery_work {
+       uint32_t epoch;
+
+       struct vnode_info *old_vinfo;
+       struct vnode_info *cur_vinfo;
+
+       struct work work;
+};
+
+/* for preparing lists */
+struct recovery_list_work {
+       struct recovery_work base;
+
+       int count;
+       uint64_t *oids;
+};
+
+/* for recoverying objects */
+struct recovery_obj_work {
+       struct recovery_work base;
+
+       uint64_t oid; /* the object to be recovered */
+       bool stop;
+};
+
+/*
+ * recovery information
+ *
+ * We cannot access the members of this structure outside of the main thread.
+ */
+struct recovery_info {
        enum rw_state state;
 
        uint32_t epoch;
        uint32_t done;
 
-       bool stop;
-       struct work work;
        /*
         * true when automatic recovery is disabled
-        * and recovery process is suspended
+        * and no recovery work is running
         */
        bool suspended;
 
@@ -46,8 +76,10 @@ struct recovery_work {
        struct vnode_info *cur_vinfo;
 };
 
-struct recovery_work *next_rw;
-static thread_unsafe(struct recovery_work *) recovering_work;
+struct recovery_info *next_rinfo;
+static thread_unsafe(struct recovery_info *) current_rinfo;
+
+static void queue_recovery_work(struct recovery_info *rinfo);
 
 /* Dynamically grown list buffer default as 4M (2T storage) */
 #define DEFAULT_LIST_BUFFER_SIZE (UINT64_C(1) << 22)
@@ -136,10 +168,11 @@ static bool is_invalid_vnode(const struct sd_vnode *entry,
  * the routine will try to recovery it from the nodes it has stayed,
  * at least, *theoretically* on consistent hash ring.
  */
-static int do_recover_object(struct recovery_work *rw)
+static int do_recover_object(struct recovery_obj_work *row)
 {
+       struct recovery_work *rw = &row->base;
        struct vnode_info *old;
-       uint64_t oid = rw->oids[rw->done];
+       uint64_t oid = row->oid;
        uint32_t epoch = rw->epoch, tgt_epoch = rw->epoch;
        int nr_copies, ret, i, start = 0;
 
@@ -178,7 +211,7 @@ again:
                        /* Succeed */
                        break;
                } else if (SD_RES_OLD_NODE_VER == ret) {
-                       rw->stop = true;
+                       row->stop = true;
                        goto err;
                } else
                        ret = -1;
@@ -214,61 +247,61 @@ static void recover_object_work(struct work *work)
 {
        struct recovery_work *rw = container_of(work, struct recovery_work,
                                                work);
-       uint64_t oid = rw->oids[rw->done];
+       struct recovery_obj_work *row = container_of(rw,
+                                                    struct recovery_obj_work,
+                                                    base);
+       uint64_t oid = row->oid;
        int ret;
 
-       sd_eprintf("done:%"PRIu32" count:%"PRIu32", oid:%"PRIx64, rw->done,
-                  rw->count, oid);
-
        if (sd_store->exist(oid)) {
                sd_dprintf("the object is already recovered");
                return;
        }
 
-       ret = do_recover_object(rw);
+       ret = do_recover_object(row);
        if (ret < 0)
                sd_eprintf("failed to recover object %"PRIx64, oid);
 }
 
 bool node_in_recovery(void)
 {
-       return thread_unsafe_get(recovering_work) != NULL;
+       return thread_unsafe_get(current_rinfo) != NULL;
 }
 
 static inline void prepare_schedule_oid(uint64_t oid)
 {
-       struct recovery_work *rw = thread_unsafe_get(recovering_work);
+       struct recovery_info *rinfo = thread_unsafe_get(current_rinfo);
        int i;
 
-       for (i = 0; i < rw->nr_prio_oids; i++)
-               if (rw->prio_oids[i] == oid)
+       for (i = 0; i < rinfo->nr_prio_oids; i++)
+               if (rinfo->prio_oids[i] == oid)
                        return;
        /*
         * We need this check because oid might not be recovered.
         * Very much unlikely though, but it might happen indeed.
         */
-       for (i = 0; i < rw->done; i++)
-               if (rw->oids[i] == oid) {
+       for (i = 0; i < rinfo->done; i++)
+               if (rinfo->oids[i] == oid) {
                        sd_dprintf("%"PRIx64" not recovered, don't schedule it",
                                   oid);
                        return;
                }
        /* When recovery is not suspended, oid is currently being recovered */
-       if (!rw->suspended && rw->oids[rw->done] == oid)
+       if (!rinfo->suspended && rinfo->oids[rinfo->done] == oid)
                return;
 
-       rw->nr_prio_oids++;
-       rw->prio_oids = xrealloc(rw->prio_oids,
-                                rw->nr_prio_oids * sizeof(uint64_t));
-       rw->prio_oids[rw->nr_prio_oids - 1] = oid;
-       sd_dprintf("%"PRIx64" nr_prio_oids %d", oid, rw->nr_prio_oids);
+       rinfo->nr_prio_oids++;
+       rinfo->prio_oids = xrealloc(rinfo->prio_oids,
+                                   rinfo->nr_prio_oids * sizeof(uint64_t));
+       rinfo->prio_oids[rinfo->nr_prio_oids - 1] = oid;
+       sd_dprintf("%"PRIx64" nr_prio_oids %d", oid, rinfo->nr_prio_oids);
 
        resume_suspended_recovery();
 }
 
 bool oid_in_recovery(uint64_t oid)
 {
-       struct recovery_work *rw = thread_unsafe_get(recovering_work);
+       struct recovery_info *rinfo = thread_unsafe_get(current_rinfo);
        int i;
 
        if (!node_in_recovery())
@@ -279,11 +312,11 @@ bool oid_in_recovery(uint64_t oid)
                return false;
        }
 
-       if (before(rw->epoch, sys->epoch))
+       if (before(rinfo->epoch, sys->epoch))
                return true;
 
        /* If we are in preparation of object list, oid is not recovered yet */
-       if (rw->state == RW_INIT)
+       if (rinfo->state == RW_PREPARE_LIST)
                return true;
 
        /*
@@ -291,15 +324,15 @@ bool oid_in_recovery(uint64_t oid)
         *
         * FIXME: do we need more efficient yet complex data structure?
         */
-       for (i = rw->done; i < rw->count; i++)
-               if (rw->oids[i] == oid)
+       for (i = rinfo->done; i < rinfo->count; i++)
+               if (rinfo->oids[i] == oid)
                        break;
 
        /*
         * Newly created object after prepare_object_list() might not be
         * in the list
         */
-       if (i == rw->count) {
+       if (i == rinfo->count) {
                sd_eprintf("%"PRIx64" is not in the recovery list", oid);
                return false;
        }
@@ -312,26 +345,49 @@ static void free_recovery_work(struct recovery_work *rw)
 {
        put_vnode_info(rw->cur_vinfo);
        put_vnode_info(rw->old_vinfo);
-       free(rw->oids);
        free(rw);
 }
 
+static void free_recovery_list_work(struct recovery_list_work *rlw)
+{
+       put_vnode_info(rlw->base.cur_vinfo);
+       put_vnode_info(rlw->base.old_vinfo);
+       free(rlw->oids);
+       free(rlw);
+}
+
+static void free_recovery_obj_work(struct recovery_obj_work *row)
+{
+       put_vnode_info(row->base.cur_vinfo);
+       put_vnode_info(row->base.old_vinfo);
+       free(row);
+}
+
+static void free_recovery_info(struct recovery_info *rinfo)
+{
+       put_vnode_info(rinfo->cur_vinfo);
+       put_vnode_info(rinfo->old_vinfo);
+       free(rinfo->oids);
+       free(rinfo->prio_oids);
+       free(rinfo);
+}
+
 /* Return true if next recovery work is queued. */
-static inline bool run_next_rw(struct recovery_work *rw)
+static inline bool run_next_rw(void)
 {
-       struct recovery_work *nrw = uatomic_xchg_ptr(&next_rw, NULL);
+       struct recovery_info *nrinfo = uatomic_xchg_ptr(&next_rinfo, NULL);
 
-       if (nrw == NULL)
+       if (nrinfo == NULL)
                return false;
 
-       free_recovery_work(rw);
+       free_recovery_info(thread_unsafe_get(current_rinfo));
 
        if (sd_store->update_epoch)
-               sd_store->update_epoch(nrw->epoch);
+               sd_store->update_epoch(nrinfo->epoch);
 
-       thread_unsafe_set(recovering_work, nrw);
+       thread_unsafe_set(current_rinfo, nrinfo);
        wakeup_all_requests();
-       queue_work(sys->recovery_wqueue, &nrw->work);
+       queue_recovery_work(nrinfo);
        sd_dprintf("recovery work is superseded");
        return true;
 }
@@ -361,27 +417,28 @@ static void notify_recovery_completion_main(struct work 
*work)
        free_recovery_work(rw);
 }
 
-static inline void finish_recovery(struct recovery_work *rw)
+static inline void finish_recovery(struct recovery_info *rinfo)
 {
-       uint32_t recovered_epoch = rw->epoch;
-       thread_unsafe_set(recovering_work, NULL);
+       uint32_t recovered_epoch = rinfo->epoch;
+       thread_unsafe_set(current_rinfo, NULL);
 
        wakeup_all_requests();
 
+       rinfo->state = RW_NOTIFY_COMPLETION;
+
        /* notify recovery completion to other nodes */
-       rw->work.fn = notify_recovery_completion_work;
-       rw->work.done = notify_recovery_completion_main;
-       queue_work(sys->recovery_wqueue, &rw->work);
+       queue_recovery_work(rinfo);
+       free_recovery_info(rinfo);
 
        sd_dprintf("recovery complete: new epoch %"PRIu32, recovered_epoch);
 }
 
-static inline bool oid_in_prio_oids(struct recovery_work *rw, uint64_t oid)
+static inline bool oid_in_prio_oids(struct recovery_info *rinfo, uint64_t oid)
 {
        int i;
 
-       for (i = 0; i < rw->nr_prio_oids; i++)
-               if (rw->prio_oids[i] == oid)
+       for (i = 0; i < rinfo->nr_prio_oids; i++)
+               if (rinfo->prio_oids[i] == oid)
                        return true;
        return false;
 }
@@ -394,38 +451,38 @@ static inline bool oid_in_prio_oids(struct recovery_work 
*rw, uint64_t oid)
  * we just move rw->prio_oids in between:
  *   new_oids = [0..rw->done - 1] + [rw->prio_oids] + [rw->done]
  */
-static inline void finish_schedule_oids(struct recovery_work *rw)
+static inline void finish_schedule_oids(struct recovery_info *rinfo)
 {
-       int i, nr_recovered = rw->done, new_idx;
+       int i, nr_recovered = rinfo->done, new_idx;
        uint64_t *new_oids;
 
        /* If I am the last oid, done */
-       if (nr_recovered == rw->count - 1)
+       if (nr_recovered == rinfo->count - 1)
                goto done;
 
        new_oids = xmalloc(list_buffer_size);
-       memcpy(new_oids, rw->oids, nr_recovered * sizeof(uint64_t));
-       memcpy(new_oids + nr_recovered, rw->prio_oids,
-              rw->nr_prio_oids * sizeof(uint64_t));
-       new_idx = nr_recovered + rw->nr_prio_oids;
+       memcpy(new_oids, rinfo->oids, nr_recovered * sizeof(uint64_t));
+       memcpy(new_oids + nr_recovered, rinfo->prio_oids,
+              rinfo->nr_prio_oids * sizeof(uint64_t));
+       new_idx = nr_recovered + rinfo->nr_prio_oids;
 
-       for (i = rw->done; i < rw->count; i++) {
-               if (oid_in_prio_oids(rw, rw->oids[i]))
+       for (i = rinfo->done; i < rinfo->count; i++) {
+               if (oid_in_prio_oids(rinfo, rinfo->oids[i]))
                        continue;
-               new_oids[new_idx++] = rw->oids[i];
+               new_oids[new_idx++] = rinfo->oids[i];
        }
        /* rw->count should eq new_idx, otherwise something is wrong */
        sd_dprintf("%snr_recovered %d, nr_prio_oids %d, count %d = new %d",
-                  rw->count == new_idx ? "" : "WARN: ", nr_recovered,
-                  rw->nr_prio_oids, rw->count, new_idx);
+                  rinfo->count == new_idx ? "" : "WARN: ", nr_recovered,
+                  rinfo->nr_prio_oids, rinfo->count, new_idx);
 
-       free(rw->oids);
-       rw->oids = new_oids;
+       free(rinfo->oids);
+       rinfo->oids = new_oids;
 done:
-       free(rw->prio_oids);
-       rw->prio_oids = NULL;
-       rw->nr_scheduled_prio_oids += rw->nr_prio_oids;
-       rw->nr_prio_oids = 0;
+       free(rinfo->prio_oids);
+       rinfo->prio_oids = NULL;
+       rinfo->nr_scheduled_prio_oids += rinfo->nr_prio_oids;
+       rinfo->nr_prio_oids = 0;
 }
 
 /*
@@ -435,37 +492,37 @@ done:
  * clients.  Sheep recovers such objects for availability even when
  * automatic object recovery is not enabled.
  */
-static bool has_scheduled_objects(struct recovery_work *rw)
+static bool has_scheduled_objects(struct recovery_info *rinfo)
 {
-       return rw->done < rw->nr_scheduled_prio_oids;
+       return rinfo->done < rinfo->nr_scheduled_prio_oids;
 }
 
-static void recover_next_object(struct recovery_work *rw)
+static void recover_next_object(struct recovery_info *rinfo)
 {
-       if (run_next_rw(rw))
+       if (run_next_rw())
                return;
 
-       if (rw->nr_prio_oids)
-               finish_schedule_oids(rw);
+       if (rinfo->nr_prio_oids)
+               finish_schedule_oids(rinfo);
 
-       if (sys->disable_recovery && !has_scheduled_objects(rw)) {
+       if (sys->disable_recovery && !has_scheduled_objects(rinfo)) {
                sd_dprintf("suspended");
-               rw->suspended = true;
+               rinfo->suspended = true;
                /* suspend until resume_suspended_recovery() is called */
                return;
        }
 
        /* Try recover next object */
-       queue_work(sys->recovery_wqueue, &rw->work);
+       queue_recovery_work(rinfo);
 }
 
 void resume_suspended_recovery(void)
 {
-       struct recovery_work *rw = thread_unsafe_get(recovering_work);
+       struct recovery_info *rinfo = thread_unsafe_get(current_rinfo);
 
-       if (rw && rw->suspended) {
-               rw->suspended = false;
-               recover_next_object(rw);
+       if (rinfo && rinfo->suspended) {
+               rinfo->suspended = false;
+               recover_next_object(rinfo);
        }
 }
 
@@ -473,10 +530,15 @@ static void recover_object_main(struct work *work)
 {
        struct recovery_work *rw = container_of(work, struct recovery_work,
                                                work);
-       if (run_next_rw(rw))
-               return;
+       struct recovery_obj_work *row = container_of(rw,
+                                                    struct recovery_obj_work,
+                                                    base);
+       struct recovery_info *rinfo = thread_unsafe_get(current_rinfo);
 
-       if (rw->stop) {
+       if (run_next_rw())
+               goto out;
+
+       if (row->stop) {
                /*
                 * Stop this recovery process and wait for epoch to be
                 * lifted and flush wait queue to requeue those
@@ -484,34 +546,49 @@ static void recover_object_main(struct work *work)
                 */
                wakeup_all_requests();
                sd_dprintf("recovery is stopped");
-               return;
+               goto out;
        }
 
-       wakeup_requests_on_oid(rw->oids[rw->done++]);
-       if (rw->done < rw->count) {
-               recover_next_object(rw);
-               return;
+       wakeup_requests_on_oid(row->oid);
+       rinfo->done++;
+
+       sd_eprintf("done:%"PRIu32" count:%"PRIu32", oid:%"PRIx64, rinfo->done,
+                  rinfo->count, row->oid);
+
+       if (rinfo->done < rinfo->count) {
+               recover_next_object(rinfo);
+               goto out;
        }
 
-       finish_recovery(rw);
+       finish_recovery(rinfo);
+out:
+       free_recovery_obj_work(row);
 }
 
 static void finish_object_list(struct work *work)
 {
        struct recovery_work *rw = container_of(work, struct recovery_work,
                                                work);
-       rw->state = RW_RUN;
-
-       if (run_next_rw(rw))
+       struct recovery_list_work *rlw = container_of(rw,
+                                                     struct recovery_list_work,
+                                                     base);
+       struct recovery_info *rinfo = thread_unsafe_get(current_rinfo);
+
+       rinfo->state = RW_RECOVER_OBJ;
+       rinfo->count = rlw->count;
+       rinfo->oids = rlw->oids;
+       rlw->oids = NULL;
+       free_recovery_list_work(rlw);
+
+       if (run_next_rw())
                return;
 
-       if (!rw->count) {
-               finish_recovery(rw);
+       if (!rinfo->count) {
+               finish_recovery(rinfo);
                return;
        }
-       rw->work.fn = recover_object_work;
-       rw->work.done = recover_object_main;
-       recover_next_object(rw);
+
+       recover_next_object(rinfo);
        return;
 }
 
@@ -553,11 +630,12 @@ retry:
 }
 
 /* Screen out objects that don't belong to this node */
-static void screen_object_list(struct recovery_work *rw,
+static void screen_object_list(struct recovery_list_work *rlw,
                               uint64_t *oids, size_t nr_oids)
 {
+       struct recovery_work *rw = &rlw->base;
        const struct sd_vnode *vnodes[SD_MAX_COPIES];
-       int old_count = rw->count;
+       int old_count = rlw->count;
        int nr_objs;
        int i, j;
 
@@ -573,21 +651,22 @@ static void screen_object_list(struct recovery_work *rw,
                for (j = 0; j < nr_objs; j++) {
                        if (!vnode_is_local(vnodes[j]))
                                continue;
-                       if (bsearch(&oids[i], rw->oids, old_count,
+                       if (bsearch(&oids[i], rlw->oids, old_count,
                                    sizeof(uint64_t), obj_cmp))
                                continue;
 
-                       rw->oids[rw->count++] = oids[i];
+                       rlw->oids[rlw->count++] = oids[i];
                        /* enlarge the list buffer if full */
-                       if (rw->count == list_buffer_size / sizeof(uint64_t)) {
+                       if (rlw->count == list_buffer_size / sizeof(uint64_t)) {
                                list_buffer_size *= 2;
-                               rw->oids = xrealloc(rw->oids, list_buffer_size);
+                               rlw->oids = xrealloc(rlw->oids,
+                                                    list_buffer_size);
                        }
                        break;
                }
        }
 
-       qsort(rw->oids, rw->count, sizeof(uint64_t), obj_cmp);
+       qsort(rlw->oids, rlw->count, sizeof(uint64_t), obj_cmp);
 }
 
 static bool newly_joined(struct sd_node *node, struct recovery_work *rw)
@@ -603,6 +682,9 @@ static void prepare_object_list(struct work *work)
 {
        struct recovery_work *rw = container_of(work, struct recovery_work,
                                                work);
+       struct recovery_list_work *rlw = container_of(rw,
+                                                     struct recovery_list_work,
+                                                     base);
        struct sd_node *cur = rw->cur_vinfo->nodes;
        int cur_nr = rw->cur_vinfo->nr_nodes;
        int start = random() % cur_nr, i, end = cur_nr;
@@ -616,7 +698,7 @@ again:
                size_t nr_oids;
                struct sd_node *node = cur + i;
 
-               if (uatomic_read(&next_rw)) {
+               if (uatomic_read(&next_rinfo)) {
                        sd_dprintf("go to the next recovery");
                        return;
                }
@@ -627,7 +709,7 @@ again:
                oids = fetch_object_list(node, rw->epoch, &nr_oids);
                if (!oids)
                        continue;
-               screen_object_list(rw, oids, nr_oids);
+               screen_object_list(rlw, oids, nr_oids);
                free(oids);
        }
 
@@ -637,7 +719,7 @@ again:
                goto again;
        }
 
-       sd_dprintf("%d", rw->count);
+       sd_dprintf("%d", rlw->count);
 }
 
 static inline bool node_is_gateway_only(void)
@@ -647,31 +729,28 @@ static inline bool node_is_gateway_only(void)
 
 int start_recovery(struct vnode_info *cur_vinfo, struct vnode_info *old_vinfo)
 {
-       struct recovery_work *rw;
+       struct recovery_info *rinfo;
 
        if (node_is_gateway_only())
                goto out;
 
-       rw = xzalloc(sizeof(struct recovery_work));
-       rw->state = RW_INIT;
-       rw->oids = xmalloc(list_buffer_size);
-       rw->epoch = sys->epoch;
-       rw->count = 0;
-
-       rw->cur_vinfo = grab_vnode_info(cur_vinfo);
-       rw->old_vinfo = grab_vnode_info(old_vinfo);
+       rinfo = xzalloc(sizeof(struct recovery_info));
+       rinfo->state = RW_PREPARE_LIST;
+       rinfo->epoch = sys->epoch;
+       rinfo->count = 0;
 
-       rw->work.fn = prepare_object_list;
-       rw->work.done = finish_object_list;
+       rinfo->cur_vinfo = grab_vnode_info(cur_vinfo);
+       rinfo->old_vinfo = grab_vnode_info(old_vinfo);
 
        if (sd_store->update_epoch)
-               sd_store->update_epoch(rw->epoch);
+               sd_store->update_epoch(rinfo->epoch);
 
-       if (thread_unsafe_get(recovering_work) != NULL) {
+       if (thread_unsafe_get(current_rinfo) != NULL) {
                /* skip the previous epoch recovery */
-               struct recovery_work *nrw = uatomic_xchg_ptr(&next_rw, rw);
-               if (nrw)
-                       free_recovery_work(nrw);
+               struct recovery_info *nrinfo;
+               nrinfo = uatomic_xchg_ptr(&next_rinfo, rinfo);
+               if (nrinfo)
+                       free_recovery_info(nrinfo);
                sd_dprintf("recovery skipped");
 
                /*
@@ -680,10 +759,50 @@ int start_recovery(struct vnode_info *cur_vinfo, struct 
vnode_info *old_vinfo)
                 */
                resume_suspended_recovery();
        } else {
-               thread_unsafe_set(recovering_work, rw);
-               queue_work(sys->recovery_wqueue, &rw->work);
+               thread_unsafe_set(current_rinfo, rinfo);
+               queue_recovery_work(rinfo);
        }
 out:
        wakeup_requests_on_epoch();
        return 0;
 }
+
+static void queue_recovery_work(struct recovery_info *rinfo)
+{
+       struct recovery_work *rw;
+       struct recovery_list_work *rlw;
+       struct recovery_obj_work *row;
+
+       switch (rinfo->state) {
+       case RW_PREPARE_LIST:
+               rlw = xzalloc(sizeof(*rlw));
+               rlw->oids = xmalloc(list_buffer_size);
+
+               rw = &rlw->base;
+               rw->work.fn = prepare_object_list;
+               rw->work.done = finish_object_list;
+               break;
+       case RW_RECOVER_OBJ:
+               row = xzalloc(sizeof(*row));
+               row->oid = rinfo->oids[rinfo->done];
+
+               rw = &row->base;
+               rw->work.fn = recover_object_work;
+               rw->work.done = recover_object_main;
+               break;
+       case RW_NOTIFY_COMPLETION:
+               rw = xzalloc(sizeof(*rw));
+               rw->work.fn = notify_recovery_completion_work;
+               rw->work.done = notify_recovery_completion_main;
+               break;
+       default:
+               panic("unknow recovery state %d", rinfo->state);
+               break;
+       }
+
+       rw->epoch = rinfo->epoch;
+       rw->cur_vinfo = grab_vnode_info(rinfo->cur_vinfo);
+       rw->old_vinfo = grab_vnode_info(rinfo->old_vinfo);
+
+       queue_work(sys->recovery_wqueue, &rw->work);
+}
-- 
1.7.9.5

-- 
sheepdog mailing list
sheepdog@lists.wpkg.org
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to