From: MORITA Kazutaka <morita.kazut...@lab.ntt.co.jp> Currently, sheepdog recovery code has a race problem because the worker thread can access recovery_work members which are not expected to be read outside of the main thread. For example, the worker thread can read rw->oids in do_recover_object() while the main thread reallocates it in finish_schedule_oids(). The root cause is that the main thread passes recovering_work to the worker thread even though the variable is not thread safe.
This patch splits recovery_work into four types to remove the race condition: - recovery_info : This was recover_work and can be accessed only in the main thread. - recovery_list_work : This is used for preparing object list in the worker thread. - recovery_obj_work : This is used for recovering objects in the worker thread. - recovery_work : This is a super class of recovery_list_work and recovery_obj_work, and is also used for notifying recovery completion in the worker thread. I think this also improves code readability and makes it easier to extend the current recovery code to multi-threaded in future. Signed-off-by: MORITA Kazutaka <morita.kazut...@lab.ntt.co.jp> --- sheep/recovery.c | 371 +++++++++++++++++++++++++++++++++++------------------- 1 file changed, 245 insertions(+), 126 deletions(-) diff --git a/sheep/recovery.c b/sheep/recovery.c index 919f597..8962742 100644 --- a/sheep/recovery.c +++ b/sheep/recovery.c @@ -18,21 +18,51 @@ enum rw_state { - RW_INIT, - RW_RUN, + RW_PREPARE_LIST, /* the recovery thread is preparing object list */ + RW_RECOVER_OBJ, /* the thread is recoering objects */ + RW_NOTIFY_COMPLETION, /* the thread is notifying recovery completion */ }; +/* base structure for the recovery thread */ struct recovery_work { + uint32_t epoch; + + struct vnode_info *old_vinfo; + struct vnode_info *cur_vinfo; + + struct work work; +}; + +/* for preparing lists */ +struct recovery_list_work { + struct recovery_work base; + + int count; + uint64_t *oids; +}; + +/* for recoverying objects */ +struct recovery_obj_work { + struct recovery_work base; + + uint64_t oid; /* the object to be recovered */ + bool stop; +}; + +/* + * recovery information + * + * We cannot access the members of this structure outside of the main thread. + */ +struct recovery_info { enum rw_state state; uint32_t epoch; uint32_t done; - bool stop; - struct work work; /* * true when automatic recovery is disabled - * and recovery process is suspended + * and no recovery work is running */ bool suspended; @@ -46,8 +76,10 @@ struct recovery_work { struct vnode_info *cur_vinfo; }; -struct recovery_work *next_rw; -static thread_unsafe(struct recovery_work *) recovering_work; +struct recovery_info *next_rinfo; +static thread_unsafe(struct recovery_info *) current_rinfo; + +static void queue_recovery_work(struct recovery_info *rinfo); /* Dynamically grown list buffer default as 4M (2T storage) */ #define DEFAULT_LIST_BUFFER_SIZE (UINT64_C(1) << 22) @@ -136,10 +168,11 @@ static bool is_invalid_vnode(const struct sd_vnode *entry, * the routine will try to recovery it from the nodes it has stayed, * at least, *theoretically* on consistent hash ring. */ -static int do_recover_object(struct recovery_work *rw) +static int do_recover_object(struct recovery_obj_work *row) { + struct recovery_work *rw = &row->base; struct vnode_info *old; - uint64_t oid = rw->oids[rw->done]; + uint64_t oid = row->oid; uint32_t epoch = rw->epoch, tgt_epoch = rw->epoch; int nr_copies, ret, i, start = 0; @@ -178,7 +211,7 @@ again: /* Succeed */ break; } else if (SD_RES_OLD_NODE_VER == ret) { - rw->stop = true; + row->stop = true; goto err; } else ret = -1; @@ -214,61 +247,61 @@ static void recover_object_work(struct work *work) { struct recovery_work *rw = container_of(work, struct recovery_work, work); - uint64_t oid = rw->oids[rw->done]; + struct recovery_obj_work *row = container_of(rw, + struct recovery_obj_work, + base); + uint64_t oid = row->oid; int ret; - sd_eprintf("done:%"PRIu32" count:%"PRIu32", oid:%"PRIx64, rw->done, - rw->count, oid); - if (sd_store->exist(oid)) { sd_dprintf("the object is already recovered"); return; } - ret = do_recover_object(rw); + ret = do_recover_object(row); if (ret < 0) sd_eprintf("failed to recover object %"PRIx64, oid); } bool node_in_recovery(void) { - return thread_unsafe_get(recovering_work) != NULL; + return thread_unsafe_get(current_rinfo) != NULL; } static inline void prepare_schedule_oid(uint64_t oid) { - struct recovery_work *rw = thread_unsafe_get(recovering_work); + struct recovery_info *rinfo = thread_unsafe_get(current_rinfo); int i; - for (i = 0; i < rw->nr_prio_oids; i++) - if (rw->prio_oids[i] == oid) + for (i = 0; i < rinfo->nr_prio_oids; i++) + if (rinfo->prio_oids[i] == oid) return; /* * We need this check because oid might not be recovered. * Very much unlikely though, but it might happen indeed. */ - for (i = 0; i < rw->done; i++) - if (rw->oids[i] == oid) { + for (i = 0; i < rinfo->done; i++) + if (rinfo->oids[i] == oid) { sd_dprintf("%"PRIx64" not recovered, don't schedule it", oid); return; } /* When recovery is not suspended, oid is currently being recovered */ - if (!rw->suspended && rw->oids[rw->done] == oid) + if (!rinfo->suspended && rinfo->oids[rinfo->done] == oid) return; - rw->nr_prio_oids++; - rw->prio_oids = xrealloc(rw->prio_oids, - rw->nr_prio_oids * sizeof(uint64_t)); - rw->prio_oids[rw->nr_prio_oids - 1] = oid; - sd_dprintf("%"PRIx64" nr_prio_oids %d", oid, rw->nr_prio_oids); + rinfo->nr_prio_oids++; + rinfo->prio_oids = xrealloc(rinfo->prio_oids, + rinfo->nr_prio_oids * sizeof(uint64_t)); + rinfo->prio_oids[rinfo->nr_prio_oids - 1] = oid; + sd_dprintf("%"PRIx64" nr_prio_oids %d", oid, rinfo->nr_prio_oids); resume_suspended_recovery(); } bool oid_in_recovery(uint64_t oid) { - struct recovery_work *rw = thread_unsafe_get(recovering_work); + struct recovery_info *rinfo = thread_unsafe_get(current_rinfo); int i; if (!node_in_recovery()) @@ -279,11 +312,11 @@ bool oid_in_recovery(uint64_t oid) return false; } - if (before(rw->epoch, sys->epoch)) + if (before(rinfo->epoch, sys->epoch)) return true; /* If we are in preparation of object list, oid is not recovered yet */ - if (rw->state == RW_INIT) + if (rinfo->state == RW_PREPARE_LIST) return true; /* @@ -291,15 +324,15 @@ bool oid_in_recovery(uint64_t oid) * * FIXME: do we need more efficient yet complex data structure? */ - for (i = rw->done; i < rw->count; i++) - if (rw->oids[i] == oid) + for (i = rinfo->done; i < rinfo->count; i++) + if (rinfo->oids[i] == oid) break; /* * Newly created object after prepare_object_list() might not be * in the list */ - if (i == rw->count) { + if (i == rinfo->count) { sd_eprintf("%"PRIx64" is not in the recovery list", oid); return false; } @@ -312,26 +345,49 @@ static void free_recovery_work(struct recovery_work *rw) { put_vnode_info(rw->cur_vinfo); put_vnode_info(rw->old_vinfo); - free(rw->oids); free(rw); } +static void free_recovery_list_work(struct recovery_list_work *rlw) +{ + put_vnode_info(rlw->base.cur_vinfo); + put_vnode_info(rlw->base.old_vinfo); + free(rlw->oids); + free(rlw); +} + +static void free_recovery_obj_work(struct recovery_obj_work *row) +{ + put_vnode_info(row->base.cur_vinfo); + put_vnode_info(row->base.old_vinfo); + free(row); +} + +static void free_recovery_info(struct recovery_info *rinfo) +{ + put_vnode_info(rinfo->cur_vinfo); + put_vnode_info(rinfo->old_vinfo); + free(rinfo->oids); + free(rinfo->prio_oids); + free(rinfo); +} + /* Return true if next recovery work is queued. */ -static inline bool run_next_rw(struct recovery_work *rw) +static inline bool run_next_rw(void) { - struct recovery_work *nrw = uatomic_xchg_ptr(&next_rw, NULL); + struct recovery_info *nrinfo = uatomic_xchg_ptr(&next_rinfo, NULL); - if (nrw == NULL) + if (nrinfo == NULL) return false; - free_recovery_work(rw); + free_recovery_info(thread_unsafe_get(current_rinfo)); if (sd_store->update_epoch) - sd_store->update_epoch(nrw->epoch); + sd_store->update_epoch(nrinfo->epoch); - thread_unsafe_set(recovering_work, nrw); + thread_unsafe_set(current_rinfo, nrinfo); wakeup_all_requests(); - queue_work(sys->recovery_wqueue, &nrw->work); + queue_recovery_work(nrinfo); sd_dprintf("recovery work is superseded"); return true; } @@ -361,27 +417,28 @@ static void notify_recovery_completion_main(struct work *work) free_recovery_work(rw); } -static inline void finish_recovery(struct recovery_work *rw) +static inline void finish_recovery(struct recovery_info *rinfo) { - uint32_t recovered_epoch = rw->epoch; - thread_unsafe_set(recovering_work, NULL); + uint32_t recovered_epoch = rinfo->epoch; + thread_unsafe_set(current_rinfo, NULL); wakeup_all_requests(); + rinfo->state = RW_NOTIFY_COMPLETION; + /* notify recovery completion to other nodes */ - rw->work.fn = notify_recovery_completion_work; - rw->work.done = notify_recovery_completion_main; - queue_work(sys->recovery_wqueue, &rw->work); + queue_recovery_work(rinfo); + free_recovery_info(rinfo); sd_dprintf("recovery complete: new epoch %"PRIu32, recovered_epoch); } -static inline bool oid_in_prio_oids(struct recovery_work *rw, uint64_t oid) +static inline bool oid_in_prio_oids(struct recovery_info *rinfo, uint64_t oid) { int i; - for (i = 0; i < rw->nr_prio_oids; i++) - if (rw->prio_oids[i] == oid) + for (i = 0; i < rinfo->nr_prio_oids; i++) + if (rinfo->prio_oids[i] == oid) return true; return false; } @@ -394,38 +451,38 @@ static inline bool oid_in_prio_oids(struct recovery_work *rw, uint64_t oid) * we just move rw->prio_oids in between: * new_oids = [0..rw->done - 1] + [rw->prio_oids] + [rw->done] */ -static inline void finish_schedule_oids(struct recovery_work *rw) +static inline void finish_schedule_oids(struct recovery_info *rinfo) { - int i, nr_recovered = rw->done, new_idx; + int i, nr_recovered = rinfo->done, new_idx; uint64_t *new_oids; /* If I am the last oid, done */ - if (nr_recovered == rw->count - 1) + if (nr_recovered == rinfo->count - 1) goto done; new_oids = xmalloc(list_buffer_size); - memcpy(new_oids, rw->oids, nr_recovered * sizeof(uint64_t)); - memcpy(new_oids + nr_recovered, rw->prio_oids, - rw->nr_prio_oids * sizeof(uint64_t)); - new_idx = nr_recovered + rw->nr_prio_oids; + memcpy(new_oids, rinfo->oids, nr_recovered * sizeof(uint64_t)); + memcpy(new_oids + nr_recovered, rinfo->prio_oids, + rinfo->nr_prio_oids * sizeof(uint64_t)); + new_idx = nr_recovered + rinfo->nr_prio_oids; - for (i = rw->done; i < rw->count; i++) { - if (oid_in_prio_oids(rw, rw->oids[i])) + for (i = rinfo->done; i < rinfo->count; i++) { + if (oid_in_prio_oids(rinfo, rinfo->oids[i])) continue; - new_oids[new_idx++] = rw->oids[i]; + new_oids[new_idx++] = rinfo->oids[i]; } /* rw->count should eq new_idx, otherwise something is wrong */ sd_dprintf("%snr_recovered %d, nr_prio_oids %d, count %d = new %d", - rw->count == new_idx ? "" : "WARN: ", nr_recovered, - rw->nr_prio_oids, rw->count, new_idx); + rinfo->count == new_idx ? "" : "WARN: ", nr_recovered, + rinfo->nr_prio_oids, rinfo->count, new_idx); - free(rw->oids); - rw->oids = new_oids; + free(rinfo->oids); + rinfo->oids = new_oids; done: - free(rw->prio_oids); - rw->prio_oids = NULL; - rw->nr_scheduled_prio_oids += rw->nr_prio_oids; - rw->nr_prio_oids = 0; + free(rinfo->prio_oids); + rinfo->prio_oids = NULL; + rinfo->nr_scheduled_prio_oids += rinfo->nr_prio_oids; + rinfo->nr_prio_oids = 0; } /* @@ -435,37 +492,37 @@ done: * clients. Sheep recovers such objects for availability even when * automatic object recovery is not enabled. */ -static bool has_scheduled_objects(struct recovery_work *rw) +static bool has_scheduled_objects(struct recovery_info *rinfo) { - return rw->done < rw->nr_scheduled_prio_oids; + return rinfo->done < rinfo->nr_scheduled_prio_oids; } -static void recover_next_object(struct recovery_work *rw) +static void recover_next_object(struct recovery_info *rinfo) { - if (run_next_rw(rw)) + if (run_next_rw()) return; - if (rw->nr_prio_oids) - finish_schedule_oids(rw); + if (rinfo->nr_prio_oids) + finish_schedule_oids(rinfo); - if (sys->disable_recovery && !has_scheduled_objects(rw)) { + if (sys->disable_recovery && !has_scheduled_objects(rinfo)) { sd_dprintf("suspended"); - rw->suspended = true; + rinfo->suspended = true; /* suspend until resume_suspended_recovery() is called */ return; } /* Try recover next object */ - queue_work(sys->recovery_wqueue, &rw->work); + queue_recovery_work(rinfo); } void resume_suspended_recovery(void) { - struct recovery_work *rw = thread_unsafe_get(recovering_work); + struct recovery_info *rinfo = thread_unsafe_get(current_rinfo); - if (rw && rw->suspended) { - rw->suspended = false; - recover_next_object(rw); + if (rinfo && rinfo->suspended) { + rinfo->suspended = false; + recover_next_object(rinfo); } } @@ -473,10 +530,15 @@ static void recover_object_main(struct work *work) { struct recovery_work *rw = container_of(work, struct recovery_work, work); - if (run_next_rw(rw)) - return; + struct recovery_obj_work *row = container_of(rw, + struct recovery_obj_work, + base); + struct recovery_info *rinfo = thread_unsafe_get(current_rinfo); - if (rw->stop) { + if (run_next_rw()) + goto out; + + if (row->stop) { /* * Stop this recovery process and wait for epoch to be * lifted and flush wait queue to requeue those @@ -484,34 +546,49 @@ static void recover_object_main(struct work *work) */ wakeup_all_requests(); sd_dprintf("recovery is stopped"); - return; + goto out; } - wakeup_requests_on_oid(rw->oids[rw->done++]); - if (rw->done < rw->count) { - recover_next_object(rw); - return; + wakeup_requests_on_oid(row->oid); + rinfo->done++; + + sd_eprintf("done:%"PRIu32" count:%"PRIu32", oid:%"PRIx64, rinfo->done, + rinfo->count, row->oid); + + if (rinfo->done < rinfo->count) { + recover_next_object(rinfo); + goto out; } - finish_recovery(rw); + finish_recovery(rinfo); +out: + free_recovery_obj_work(row); } static void finish_object_list(struct work *work) { struct recovery_work *rw = container_of(work, struct recovery_work, work); - rw->state = RW_RUN; - - if (run_next_rw(rw)) + struct recovery_list_work *rlw = container_of(rw, + struct recovery_list_work, + base); + struct recovery_info *rinfo = thread_unsafe_get(current_rinfo); + + rinfo->state = RW_RECOVER_OBJ; + rinfo->count = rlw->count; + rinfo->oids = rlw->oids; + rlw->oids = NULL; + free_recovery_list_work(rlw); + + if (run_next_rw()) return; - if (!rw->count) { - finish_recovery(rw); + if (!rinfo->count) { + finish_recovery(rinfo); return; } - rw->work.fn = recover_object_work; - rw->work.done = recover_object_main; - recover_next_object(rw); + + recover_next_object(rinfo); return; } @@ -553,11 +630,12 @@ retry: } /* Screen out objects that don't belong to this node */ -static void screen_object_list(struct recovery_work *rw, +static void screen_object_list(struct recovery_list_work *rlw, uint64_t *oids, size_t nr_oids) { + struct recovery_work *rw = &rlw->base; const struct sd_vnode *vnodes[SD_MAX_COPIES]; - int old_count = rw->count; + int old_count = rlw->count; int nr_objs; int i, j; @@ -573,21 +651,22 @@ static void screen_object_list(struct recovery_work *rw, for (j = 0; j < nr_objs; j++) { if (!vnode_is_local(vnodes[j])) continue; - if (bsearch(&oids[i], rw->oids, old_count, + if (bsearch(&oids[i], rlw->oids, old_count, sizeof(uint64_t), obj_cmp)) continue; - rw->oids[rw->count++] = oids[i]; + rlw->oids[rlw->count++] = oids[i]; /* enlarge the list buffer if full */ - if (rw->count == list_buffer_size / sizeof(uint64_t)) { + if (rlw->count == list_buffer_size / sizeof(uint64_t)) { list_buffer_size *= 2; - rw->oids = xrealloc(rw->oids, list_buffer_size); + rlw->oids = xrealloc(rlw->oids, + list_buffer_size); } break; } } - qsort(rw->oids, rw->count, sizeof(uint64_t), obj_cmp); + qsort(rlw->oids, rlw->count, sizeof(uint64_t), obj_cmp); } static bool newly_joined(struct sd_node *node, struct recovery_work *rw) @@ -603,6 +682,9 @@ static void prepare_object_list(struct work *work) { struct recovery_work *rw = container_of(work, struct recovery_work, work); + struct recovery_list_work *rlw = container_of(rw, + struct recovery_list_work, + base); struct sd_node *cur = rw->cur_vinfo->nodes; int cur_nr = rw->cur_vinfo->nr_nodes; int start = random() % cur_nr, i, end = cur_nr; @@ -616,7 +698,7 @@ again: size_t nr_oids; struct sd_node *node = cur + i; - if (uatomic_read(&next_rw)) { + if (uatomic_read(&next_rinfo)) { sd_dprintf("go to the next recovery"); return; } @@ -627,7 +709,7 @@ again: oids = fetch_object_list(node, rw->epoch, &nr_oids); if (!oids) continue; - screen_object_list(rw, oids, nr_oids); + screen_object_list(rlw, oids, nr_oids); free(oids); } @@ -637,7 +719,7 @@ again: goto again; } - sd_dprintf("%d", rw->count); + sd_dprintf("%d", rlw->count); } static inline bool node_is_gateway_only(void) @@ -647,31 +729,28 @@ static inline bool node_is_gateway_only(void) int start_recovery(struct vnode_info *cur_vinfo, struct vnode_info *old_vinfo) { - struct recovery_work *rw; + struct recovery_info *rinfo; if (node_is_gateway_only()) goto out; - rw = xzalloc(sizeof(struct recovery_work)); - rw->state = RW_INIT; - rw->oids = xmalloc(list_buffer_size); - rw->epoch = sys->epoch; - rw->count = 0; - - rw->cur_vinfo = grab_vnode_info(cur_vinfo); - rw->old_vinfo = grab_vnode_info(old_vinfo); + rinfo = xzalloc(sizeof(struct recovery_info)); + rinfo->state = RW_PREPARE_LIST; + rinfo->epoch = sys->epoch; + rinfo->count = 0; - rw->work.fn = prepare_object_list; - rw->work.done = finish_object_list; + rinfo->cur_vinfo = grab_vnode_info(cur_vinfo); + rinfo->old_vinfo = grab_vnode_info(old_vinfo); if (sd_store->update_epoch) - sd_store->update_epoch(rw->epoch); + sd_store->update_epoch(rinfo->epoch); - if (thread_unsafe_get(recovering_work) != NULL) { + if (thread_unsafe_get(current_rinfo) != NULL) { /* skip the previous epoch recovery */ - struct recovery_work *nrw = uatomic_xchg_ptr(&next_rw, rw); - if (nrw) - free_recovery_work(nrw); + struct recovery_info *nrinfo; + nrinfo = uatomic_xchg_ptr(&next_rinfo, rinfo); + if (nrinfo) + free_recovery_info(nrinfo); sd_dprintf("recovery skipped"); /* @@ -680,10 +759,50 @@ int start_recovery(struct vnode_info *cur_vinfo, struct vnode_info *old_vinfo) */ resume_suspended_recovery(); } else { - thread_unsafe_set(recovering_work, rw); - queue_work(sys->recovery_wqueue, &rw->work); + thread_unsafe_set(current_rinfo, rinfo); + queue_recovery_work(rinfo); } out: wakeup_requests_on_epoch(); return 0; } + +static void queue_recovery_work(struct recovery_info *rinfo) +{ + struct recovery_work *rw; + struct recovery_list_work *rlw; + struct recovery_obj_work *row; + + switch (rinfo->state) { + case RW_PREPARE_LIST: + rlw = xzalloc(sizeof(*rlw)); + rlw->oids = xmalloc(list_buffer_size); + + rw = &rlw->base; + rw->work.fn = prepare_object_list; + rw->work.done = finish_object_list; + break; + case RW_RECOVER_OBJ: + row = xzalloc(sizeof(*row)); + row->oid = rinfo->oids[rinfo->done]; + + rw = &row->base; + rw->work.fn = recover_object_work; + rw->work.done = recover_object_main; + break; + case RW_NOTIFY_COMPLETION: + rw = xzalloc(sizeof(*rw)); + rw->work.fn = notify_recovery_completion_work; + rw->work.done = notify_recovery_completion_main; + break; + default: + panic("unknow recovery state %d", rinfo->state); + break; + } + + rw->epoch = rinfo->epoch; + rw->cur_vinfo = grab_vnode_info(rinfo->cur_vinfo); + rw->old_vinfo = grab_vnode_info(rinfo->old_vinfo); + + queue_work(sys->recovery_wqueue, &rw->work); +} -- 1.7.9.5 -- sheepdog mailing list sheepdog@lists.wpkg.org http://lists.wpkg.org/mailman/listinfo/sheepdog