We have to manipulate next_rw atomically because next_rw can be accessed by both main and worker threads.
Signed-off-by: MORITA Kazutaka <morita.kazut...@lab.ntt.co.jp> --- include/util.h | 8 ++++++++ sheep/recovery.c | 37 +++++++++++++++++++------------------ 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/include/util.h b/include/util.h index 38efb8b..18fc6fa 100644 --- a/include/util.h +++ b/include/util.h @@ -133,6 +133,14 @@ static inline void uatomic_set_false(uatomic_bool *val) uatomic_set(&val->val, 0); } +/* uatomic_xchg for pointers */ +#define uatomic_xchg_ptr(p, val) \ +({ \ + uintptr_t ret; \ + ret = uatomic_xchg((uintptr_t *)(p), (val)); \ + (typeof(*(p)))ret; \ +}) + /* colors */ #define TEXT_NORMAL "\033[0m" #define TEXT_BOLD "\033[1m" diff --git a/sheep/recovery.c b/sheep/recovery.c index e3a60f4..0be846c 100644 --- a/sheep/recovery.c +++ b/sheep/recovery.c @@ -301,15 +301,20 @@ static void free_recovery_work(struct recovery_work *rw) free(rw); } -static inline void run_next_rw(struct recovery_work *rw) +/* Return true if next recovery work is queued. */ +static inline bool run_next_rw(struct recovery_work *rw) { + struct recovery_work *nrw = uatomic_xchg_ptr(&next_rw, NULL); + + if (nrw == NULL) + return false; + free_recovery_work(rw); - rw = next_rw; - next_rw = NULL; - recovering_work = rw; + recovering_work = nrw; wakeup_all_requests(); - queue_work(sys->recovery_wqueue, &rw->work); + queue_work(sys->recovery_wqueue, &nrw->work); sd_dprintf("recovery work is superseded"); + return true; } static void notify_recovery_completion_work(struct work *work) @@ -421,10 +426,8 @@ static bool has_scheduled_objects(struct recovery_work *rw) static void recover_next_object(struct recovery_work *rw) { - if (next_rw) { - run_next_rw(rw); + if (run_next_rw(rw)) return; - } if (rw->nr_prio_oids) finish_schedule_oids(rw); @@ -452,10 +455,8 @@ static void recover_object_main(struct work *work) { struct recovery_work *rw = container_of(work, struct recovery_work, work); - if (next_rw) { - run_next_rw(rw); + if (run_next_rw(rw)) return; - } if (rw->stop) { /* @@ -482,10 +483,10 @@ static void finish_object_list(struct work *work) struct recovery_work *rw = container_of(work, struct recovery_work, work); rw->state = RW_RUN; - if (next_rw) { - run_next_rw(rw); + + if (run_next_rw(rw)) return; - } + if (!rw->count) { finish_recovery(rw); return; @@ -598,7 +599,7 @@ again: size_t nr_oids; struct sd_node *node = cur + i; - if (next_rw) { + if (uatomic_read(&next_rw)) { sd_dprintf("go to the next recovery"); return; } @@ -648,10 +649,10 @@ int start_recovery(struct vnode_info *cur_vinfo, struct vnode_info *old_vinfo) if (recovering_work != NULL) { /* skip the previous epoch recovery */ - if (next_rw) - free_recovery_work(next_rw); + struct recovery_work *nrw = uatomic_xchg_ptr(&next_rw, rw); + if (nrw) + free_recovery_work(nrw); sd_dprintf("recovery skipped"); - next_rw = rw; /* * This is necesary to invoke run_next_rw when -- 1.8.1.3.566.gaa39828 -- sheepdog mailing list sheepdog@lists.wpkg.org http://lists.wpkg.org/mailman/listinfo/sheepdog