The commit is pushed to "branch-rh7-3.10.0-693.21.1.vz7.50.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git after rh7-3.10.0-693.21.1.vz7.50.1 ------> commit 785731c50a55a452f5ee0aecc0fa438c7edfba59 Author: Alexey Kuznetsov <kuz...@virtuozzo.com> Date: Wed May 23 11:04:18 2018 +0300
fuse kio_pcs: ports from new user-space New token-based congestion avoidance is ported to kernel. Also, some bugs in fuse_kio_pcs congestion avoidance are fixed. There was a race condition, when cwnd could go to 0 from another thread and congestion queue would stuck. Now we solve the race condition moving cwnd check under cs->lock. In case cs is not congested under cs->lock, request is submitted immediately. Signed-off-by: Alexey Kuznetsov <kuz...@virtuozzo.com> Signed-off-by: Kirill Tkhai <ktk...@virtuozzo.com> --- fs/fuse/kio/pcs/pcs_cluster.c | 31 +++- fs/fuse/kio/pcs/pcs_cs.c | 25 ++-- fs/fuse/kio/pcs/pcs_cs.h | 15 +- fs/fuse/kio/pcs/pcs_fuse_kdirect.c | 10 +- fs/fuse/kio/pcs/pcs_map.c | 287 ++++++++++++++++++++++--------------- fs/fuse/kio/pcs/pcs_map.h | 7 +- fs/fuse/kio/pcs/pcs_req.h | 15 ++ 7 files changed, 241 insertions(+), 149 deletions(-) diff --git a/fs/fuse/kio/pcs/pcs_cluster.c b/fs/fuse/kio/pcs/pcs_cluster.c index 79071655f6e5..24ec8a5f39a3 100644 --- a/fs/fuse/kio/pcs/pcs_cluster.c +++ b/fs/fuse/kio/pcs/pcs_cluster.c @@ -18,6 +18,7 @@ #include "../../fuse_i.h" void pcs_cc_process_ireq_chunk(struct pcs_int_request *ireq); +static void ireq_process_(struct pcs_int_request *ireq); static inline int is_file_inline(struct pcs_dentry_info *di) { @@ -226,6 +227,9 @@ static int fiemap_worker(void * arg) sreq->dentry = di; sreq->type = PCS_IREQ_IOCHUNK; + INIT_LIST_HEAD(&sreq->tok_list); + sreq->tok_reserved = 0; + sreq->tok_serno = 0; sreq->iochunk.map = NULL; sreq->iochunk.flow = pcs_flow_record(&di->mapping.ftab, 0, pos, end-pos, &di->cluster->maps.ftab); sreq->iochunk.cmd = PCS_REQ_T_FIEMAP; @@ -280,7 +284,7 @@ void pcs_cc_process_ireq_chunk(struct pcs_int_request *ireq) pcs_map_put(ireq->iochunk.map); ireq->iochunk.map = map; - map_submit(map, ireq, 0); + map_submit(map, ireq); } /* TODO Remove noinline in production */ @@ -325,6 +329,9 @@ static noinline void __pcs_cc_process_ireq_rw(struct pcs_int_request *ireq) sreq->dentry = di; sreq->type = PCS_IREQ_IOCHUNK; + INIT_LIST_HEAD(&sreq->tok_list); + sreq->tok_reserved = 0; + sreq->tok_serno = 0; sreq->iochunk.map = NULL; sreq->iochunk.flow = pcs_flow_get(fl); sreq->iochunk.cmd = ireq->apireq.req->type; @@ -391,6 +398,25 @@ static void pcs_cc_process_ireq_ioreq(struct pcs_int_request *ireq) return __pcs_cc_process_ireq_rw(ireq); } +static void process_ireq_token(struct pcs_int_request * ireq) +{ + struct pcs_int_request * parent = ireq->token.parent; + + if (parent) { + int do_execute = 0; + + spin_lock(&parent->completion_data.child_lock); + if (ireq->token.parent) { + ireq_drop_tokens(parent); + do_execute = 1; + } + spin_unlock(&parent->completion_data.child_lock); + if (do_execute) + ireq_process_(parent); + } + ireq_destroy(ireq); +} + static void ireq_process_(struct pcs_int_request *ireq) { struct fuse_conn * fc = container_of(ireq->cc, struct pcs_fuse_cluster, cc)->fc; @@ -425,6 +451,9 @@ static void ireq_process_(struct pcs_int_request *ireq) case PCS_IREQ_CUSTOM: ireq->custom.action(ireq); break; + case PCS_IREQ_TOKEN: + process_ireq_token(ireq); + break; default: BUG(); break; diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c index 151b06cb18cf..cae591e65a42 100644 --- a/fs/fuse/kio/pcs/pcs_cs.c +++ b/fs/fuse/kio/pcs/pcs_cs.c @@ -694,8 +694,8 @@ void pcs_cs_notify_error(struct pcs_cluster_core *cc, pcs_error_t *err) if (cs == NULL) return; + list_splice_tail_init(&cs->active_list, &queue); list_splice_tail_init(&cs->cong_queue, &queue); - clear_bit(CS_SF_CONGESTED, &cs->state); cs->cong_queue_len = 0; cs_blacklist(cs, err->value, "notify error"); spin_unlock(&cs->lock); @@ -710,9 +710,7 @@ static void pcs_cs_isolate(struct pcs_cs *cs, struct list_head *dispose) list_splice_tail_init(&cs->active_list, dispose); list_splice_tail_init(&cs->cong_queue, dispose); - cs->active_list_len = 0; cs->cong_queue_len = 0; - clear_bit(CS_SF_CONGESTED, &cs->state); cs->is_dead = 1; spin_lock(&cs->css->lock); @@ -920,7 +918,7 @@ void cs_decrement_in_flight(struct pcs_cs *cs, unsigned int to_dec) if (cs->in_flight < cs->eff_cwnd) { cs->cwr_state = 0; - pcs_cs_flush_cong_queue(cs); + pcs_cs_activate_cong_queue(cs); } if (cs->in_flight == 0) cs->idle_stamp = jiffies; @@ -1066,6 +1064,7 @@ void pcs_csset_init(struct pcs_cs_set *css) INIT_DELAYED_WORK(&css->bl_work, bl_timer_work); css->ncs = 0; spin_lock_init(&css->lock); + atomic64_set(&css->csl_serno_gen, 0); } void pcs_csset_fini(struct pcs_cs_set *css) @@ -1146,14 +1145,18 @@ void pcs_cs_set_stat_up(struct pcs_cs_set *set) pcs_cs_for_each_entry(set, do_update_stat, 0); } -void pcs_cs_cong_enqueue(struct pcs_int_request *ireq, struct pcs_cs *cs) +int pcs_cs_cong_enqueue_cond(struct pcs_int_request *ireq, struct pcs_cs *cs) { + int queued = 0; + spin_lock(&cs->lock); - if (test_bit(CS_SF_CONGESTED, &cs->state)) - test_bit(CS_SF_CONGESTED, &cs->state); - list_add_tail(&ireq->list, &cs->cong_queue); - cs->cong_queue_len++; - if (!ireq->qdepth) - ireq->qdepth = cs->cong_queue_len + cs->active_list_len; + if (cs->in_flight >= cs->eff_cwnd) { + list_add_tail(&ireq->list, &cs->cong_queue); + cs->cong_queue_len++; + if (!ireq->qdepth) + ireq->qdepth = cs->cong_queue_len; + queued = 1; + } spin_unlock(&cs->lock); + return queued; } diff --git a/fs/fuse/kio/pcs/pcs_cs.h b/fs/fuse/kio/pcs/pcs_cs.h index f46b31f2633d..eb81ac51f3ae 100644 --- a/fs/fuse/kio/pcs/pcs_cs.h +++ b/fs/fuse/kio/pcs/pcs_cs.h @@ -36,7 +36,6 @@ enum { CS_SF_FAILED, CS_SF_BLACKLISTED, CS_SF_ACTIVE, - CS_SF_CONGESTED, }; struct pcs_cs { @@ -66,14 +65,13 @@ struct pcs_cs { struct list_head cong_queue; int cong_queue_len; struct list_head active_list; - int active_list_len; pcs_cs_io_prio_t io_prio; pcs_cs_net_prio_t net_prio; u8 mds_flags; abs_time_t io_prio_stamp; - struct list_head flow_lru; + struct list_head flow_lru; int nflows; unsigned long state; @@ -104,24 +102,20 @@ static inline void pcs_cs_init_cong_queue(struct pcs_cs *cs) { INIT_LIST_HEAD(&cs->cong_queue); cs->cong_queue_len = 0; - clear_bit(CS_SF_CONGESTED, &cs->state); } static inline void pcs_cs_init_active_list(struct pcs_cs *cs) { INIT_LIST_HEAD(&cs->active_list); - cs->active_list_len = 0; } -static inline void pcs_cs_flush_cong_queue(struct pcs_cs *cs) +static inline void pcs_cs_activate_cong_queue(struct pcs_cs *cs) { assert_spin_locked(&cs->lock); - list_splice_tail(&cs->cong_queue, &cs->active_list); - cs->active_list_len += cs->cong_queue_len; - pcs_cs_init_cong_queue(cs); + list_splice_tail_init(&cs->cong_queue, &cs->active_list); } -void pcs_cs_cong_enqueue(struct pcs_int_request *ireq, struct pcs_cs *cs); +int pcs_cs_cong_enqueue_cond(struct pcs_int_request *ireq, struct pcs_cs *cs); #define PCS_CS_HASH_SIZE 1024 @@ -132,6 +126,7 @@ struct pcs_cs_set { struct delayed_work bl_work; unsigned int ncs; spinlock_t lock; + atomic64_t csl_serno_gen; }; void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq); diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c index f4c0fb15403e..61378f0d9a58 100644 --- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c +++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c @@ -843,11 +843,6 @@ static void pcs_fuse_submit(struct pcs_fuse_cluster *pfc, struct fuse_req *req, if (inarg->offset >= di->fileinfo.attr.size) inarg->mode &= ~FALLOC_FL_ZERO_RANGE; - if (inarg->mode & FALLOC_FL_KEEP_SIZE) { - if (inarg->offset + inarg->length > di->fileinfo.attr.size) - inarg->length = di->fileinfo.attr.size - inarg->offset; - } - if (inarg->mode & (FALLOC_FL_ZERO_RANGE|FALLOC_FL_PUNCH_HOLE)) { if ((inarg->offset & (PAGE_SIZE - 1)) || (inarg->length & (PAGE_SIZE - 1))) { r->req.out.h.error = -EINVAL; @@ -855,6 +850,11 @@ static void pcs_fuse_submit(struct pcs_fuse_cluster *pfc, struct fuse_req *req, } } + if (inarg->mode & FALLOC_FL_KEEP_SIZE) { + if (inarg->offset + inarg->length > di->fileinfo.attr.size) + inarg->length = di->fileinfo.attr.size - inarg->offset; + } + ret = pcs_fuse_prep_rw(r); if (!ret) goto submit; diff --git a/fs/fuse/kio/pcs/pcs_map.c b/fs/fuse/kio/pcs/pcs_map.c index 085f49fd0219..90e311f7e995 100644 --- a/fs/fuse/kio/pcs/pcs_map.c +++ b/fs/fuse/kio/pcs/pcs_map.c @@ -911,8 +911,8 @@ struct pcs_cs_list* cslist_alloc( struct pcs_cs_set *css, struct pcs_cs_info *re atomic_set(&cs_list->refcnt, 1); atomic_set(&cs_list->seq_read_in_flight, 0); cs_list->read_index = -1; - cs_list->cong_index = -1 ; cs_list->flags = 0; + cs_list->serno = atomic64_inc_return(&css->csl_serno_gen); cs_list->blacklist = 0; cs_list->read_timeout = (read_tout * HZ) / 1000; cs_list->write_timeout = (write_tout * HZ) / 1000; @@ -1380,7 +1380,7 @@ static void pcs_cs_deaccount(struct pcs_int_request *ireq, struct pcs_cs * cs, i spin_unlock(&cs->lock); } -static void pcs_cs_wakeup(struct pcs_cs * cs, int requeue) +static void pcs_cs_wakeup(struct pcs_cs * cs) { struct pcs_int_request * sreq; struct pcs_map_entry * map; @@ -1393,11 +1393,32 @@ static void pcs_cs_wakeup(struct pcs_cs * cs, int requeue) break; } sreq = list_first_entry(&cs->active_list, struct pcs_int_request, list); - BUG_ON(!cs->active_list_len); list_del_init(&sreq->list); - cs->active_list_len--; + cs->cong_queue_len--; spin_unlock(&cs->lock); + if (sreq->type == PCS_IREQ_TOKEN) { + struct pcs_int_request * parent = sreq->token.parent; + int do_execute = 0; + + if (parent == NULL) { + ireq_destroy(sreq); + continue; + } + + spin_lock(&parent->completion_data.child_lock); + if (sreq->token.parent) { + parent->tok_reserved |= (1ULL << sreq->token.cs_index); + list_del(&sreq->token.tok_link); + do_execute = list_empty(&parent->tok_list); + } + spin_unlock(&parent->completion_data.child_lock); + ireq_destroy(sreq); + if (!do_execute) + continue; + sreq = parent; + } + if (sreq->type != PCS_IREQ_FLUSH) { map = pcs_find_get_map(sreq->dentry, sreq->iochunk.chunk + ((sreq->flags & IREQ_F_MAPPED) ? 0 : sreq->iochunk.offset)); @@ -1412,7 +1433,7 @@ static void pcs_cs_wakeup(struct pcs_cs * cs, int requeue) preq->apireq.req->pos, preq->apireq.req->size, &sreq->cc->maps.ftab); } - map_submit(map, sreq, requeue); + map_submit(map, sreq); } else { map_queue_on_limit(sreq); } @@ -1422,58 +1443,33 @@ static void pcs_cs_wakeup(struct pcs_cs * cs, int requeue) pcs_clear_error(&sreq->error); ireq_complete(sreq); } else - map_submit(map, sreq, requeue); + map_submit(map, sreq); } } } static int __pcs_cs_still_congested(struct pcs_cs * cs) { + if (!list_empty(&cs->active_list)) + list_splice_tail_init(&cs->active_list, &cs->cong_queue); - assert_spin_locked(&cs->lock); - - if (!list_empty(&cs->active_list)) { - BUG_ON(!cs->active_list_len); - list_splice_tail(&cs->active_list, &cs->cong_queue); - cs->cong_queue_len += cs->active_list_len; - set_bit(CS_SF_CONGESTED, &cs->state); - pcs_cs_init_active_list(cs); - } else if (list_empty(&cs->cong_queue)) { + if (list_empty(&cs->cong_queue)) { BUG_ON(cs->cong_queue_len); - BUG_ON(test_bit(CS_SF_CONGESTED, &cs->state)); return 0; - } else { - BUG_ON(cs->active_list_len); } - if (cs->in_flight >= cs->eff_cwnd) - return 0; - - /* Exceptional situation: CS is not congested, but still has congestion queue. - * This can happen f.e. when CS was congested with reads and has some writes in queue, - * then all reads are complete, but writes cannot be sent because of congestion - * on another CSes in chain. This is absolutely normal, we just should queue - * not on this CS, but on actualle congested CSes. With current algorithm of preventing - * reordering, we did a mistake and queued on node which used to be congested. - * Solution for now is to retry sending with flag "requeue" set, it will requeue - * requests on another nodes. It is difficult to say how frequently this happens, - * so we spit out message. If we will have lots of them in logs, we have to select - * different solution. - */ - - TRACE("CS#" NODE_FMT " is free, but still has queue", NODE_ARGS(cs->id)); - pcs_cs_flush_cong_queue(cs); - - return 1; + return cs->in_flight < cs->eff_cwnd; } + static int pcs_cs_still_congested(struct pcs_cs * cs) { - int ret; + int res; spin_lock(&cs->lock); - ret = __pcs_cs_still_congested(cs); + res = __pcs_cs_still_congested(cs); spin_unlock(&cs->lock); - return ret; + + return res; } void pcs_deaccount_ireq(struct pcs_int_request *ireq, pcs_error_t * err) @@ -1518,7 +1514,7 @@ void pcs_deaccount_ireq(struct pcs_int_request *ireq, pcs_error_t * err) if (ireq->type == PCS_IREQ_FLUSH || (pcs_req_direction(ireq->iochunk.cmd) && !(ireq->flags & IREQ_F_MAPPED))) { int i; - int requeue = 0; + int requeue; for (i = csl->nsrv - 1; i >= 0; i--) { if (!match_id || csl->cs[i].cslink.cs->id.val == match_id) @@ -1538,14 +1534,14 @@ void pcs_deaccount_ireq(struct pcs_int_request *ireq, pcs_error_t * err) do { for (i = csl->nsrv - 1; i >= 0; i--) - pcs_cs_wakeup(csl->cs[i].cslink.cs, requeue); + pcs_cs_wakeup(csl->cs[i].cslink.cs); requeue = 0; for (i = csl->nsrv - 1; i >= 0; i--) - requeue += pcs_cs_still_congested(csl->cs[i].cslink.cs); + requeue |= pcs_cs_still_congested(csl->cs[i].cslink.cs); } while (requeue); } else { - int requeue = 0; + int requeue; struct pcs_cs * rcs = csl->cs[ireq->iochunk.cs_index].cslink.cs; if (ireq->flags & IREQ_F_SEQ_READ) { @@ -1557,7 +1553,7 @@ void pcs_deaccount_ireq(struct pcs_int_request *ireq, pcs_error_t * err) pcs_cs_deaccount(ireq, rcs, error); do { - pcs_cs_wakeup(rcs, requeue); + pcs_cs_wakeup(rcs); requeue = pcs_cs_still_congested(rcs); } while (requeue); @@ -1771,6 +1767,10 @@ pcs_ireq_split(struct pcs_int_request *ireq, unsigned int iochunk, int noalign) sreq->iochunk.map = ireq->iochunk.map; if (sreq->iochunk.map) __pcs_map_get(sreq->iochunk.map); + INIT_LIST_HEAD(&sreq->tok_list); + BUG_ON(!list_empty(&ireq->tok_list)); + sreq->tok_reserved = ireq->tok_reserved; + sreq->tok_serno = ireq->tok_serno; sreq->iochunk.flow = pcs_flow_get(ireq->iochunk.flow); sreq->iochunk.cmd = ireq->iochunk.cmd; sreq->iochunk.role = ireq->iochunk.role; @@ -1803,7 +1803,7 @@ pcs_ireq_split(struct pcs_int_request *ireq, unsigned int iochunk, int noalign) return sreq; } -static int pcs_cslist_submit_read(struct pcs_int_request *ireq, struct pcs_cs_list * csl, int requeue) +static int pcs_cslist_submit_read(struct pcs_int_request *ireq, struct pcs_cs_list * csl) { struct pcs_cluster_core *cc = ireq->cc; struct pcs_cs * cs; @@ -1872,9 +1872,8 @@ static int pcs_cslist_submit_read(struct pcs_int_request *ireq, struct pcs_cs_li spin_unlock(&cs->lock); if (allot < 0) { - pcs_cs_cong_enqueue(ireq, cs); - - return 0; + if (pcs_cs_cong_enqueue_cond(ireq, cs)) + return 0; } if (allot < ireq->dentry->cluster->cfg.curr.lmss) @@ -1924,77 +1923,131 @@ static int pcs_cslist_submit_read(struct pcs_int_request *ireq, struct pcs_cs_li return 0; if (allot < 0) { - pcs_cs_cong_enqueue(ireq, cs); - return 0; + if (pcs_cs_cong_enqueue_cond(ireq, cs)) + return 0; } } } -static int pcs_cslist_submit_write(struct pcs_int_request *ireq, struct pcs_cs_list * csl, int requeue) +static int ireq_queue_tokens(struct pcs_int_request * ireq, struct pcs_cs_list * csl) +{ + int i; + int queued = 0; + struct list_head drop; + struct pcs_int_request * toks[csl->nsrv]; + + INIT_LIST_HEAD(&drop); + + for (i = 0; i < csl->nsrv; i++) { + struct pcs_int_request * ntok; + + /* ireq is private; no need to lock tok_* fields */ + + if (ireq->tok_reserved & (1ULL << i)) { + toks[i] = NULL; + continue; + } + + ntok = ireq_alloc(ireq->dentry); + BUG_ON(!ntok); + ntok->type = PCS_IREQ_TOKEN; + ntok->token.parent = ireq; + ntok->token.cs_index = i; + toks[i] = ntok; + } + + /* Publish tokens in CS queues */ + spin_lock(&ireq->completion_data.child_lock); + for (i = 0; i < csl->nsrv; i++) { + if (toks[i]) { + struct pcs_cs * cs = csl->cs[i].cslink.cs; + if (pcs_cs_cong_enqueue_cond(toks[i], cs)) { + list_add(&toks[i]->token.tok_link, &ireq->tok_list); + toks[i] = NULL; + queued = 1; + } else { + list_add(&toks[i]->token.tok_link, &drop); + } + } + } + spin_unlock(&ireq->completion_data.child_lock); + + while (!list_empty(&drop)) { + struct pcs_int_request * tok = list_first_entry(&drop, struct pcs_int_request, token.tok_link); + list_del(&tok->token.tok_link); + ireq_destroy(tok); + } + return queued; +} + +void ireq_drop_tokens(struct pcs_int_request * ireq) +{ + assert_spin_locked(&ireq->completion_data.child_lock); + + while (!list_empty(&ireq->tok_list)) { + struct pcs_int_request * tok = list_first_entry(&ireq->tok_list, struct pcs_int_request, token.tok_link); + tok->token.parent = NULL; + list_del(&tok->token.tok_link); + } +} + +static int pcs_cslist_submit_write(struct pcs_int_request *ireq, struct pcs_cs_list * csl) { struct pcs_cs * cs; unsigned int iochunk; int i; - int congested_idx; - int max_excess; int allot; + struct pcs_cs * congested_cs = NULL; + u64 congested = 0; ireq->iochunk.cs_index = 0; iochunk = ireq->dentry->cluster->cfg.curr.lmss; restart: - congested_idx = -1; - max_excess = 0; allot = ireq->iochunk.size; + if (csl->serno != ireq->tok_serno) + ireq->tok_reserved = 0; + BUG_ON(!list_empty(&ireq->tok_list)); for (i = 0; i < csl->nsrv; i++) { - int cs_allot; - cs = csl->cs[i].cslink.cs; if (cs_is_blacklisted(cs)) { map_remote_error(ireq->iochunk.map, cs->blacklist_reason, cs->id.val); TRACE("Write to " MAP_FMT " blocked by blacklist error %d, CS" NODE_FMT, MAP_ARGS(ireq->iochunk.map), cs->blacklist_reason, NODE_ARGS(cs->id)); + spin_lock(&ireq->completion_data.child_lock); + ireq_drop_tokens(ireq); + spin_unlock(&ireq->completion_data.child_lock); return -1; } spin_lock(&cs->lock); cs_cwnd_use_or_lose(cs); - cs_allot = cs->eff_cwnd - cs->in_flight; spin_unlock(&cs->lock); - if (cs_allot < 0) { - cs_allot = -cs_allot; - if (cs_allot > max_excess) { - congested_idx = i; - max_excess = cs_allot; - } - } else { - if (cs_allot < allot) - allot = cs_allot; - } + if (cs->in_flight > cs->eff_cwnd && !(ireq->tok_reserved & (1ULL << i))) { + congested_cs = cs; + congested |= (1ULL << i); + } else + ireq->tok_reserved |= (1ULL << i); if (!(test_bit(CS_SF_LOCAL, &cs->state))) iochunk = ireq->dentry->cluster->cfg.curr.wmss; } - if (congested_idx >= 0) { - int cur_cong_idx = READ_ONCE(csl->cong_index); + if (allot < ireq->dentry->cluster->cfg.curr.lmss) + allot = ireq->dentry->cluster->cfg.curr.lmss; + if (congested) { + int queued; - if (cur_cong_idx >= 0 && !requeue && - (READ_ONCE(csl->cs[cur_cong_idx].cslink.cs->cong_queue_len) || - READ_ONCE(csl->cs[cur_cong_idx].cslink.cs->active_list_len))) - congested_idx = cur_cong_idx; + ireq->tok_serno = csl->serno; + if (congested & (congested - 1)) + queued = ireq_queue_tokens(ireq, csl); else - WRITE_ONCE(csl->cong_index, congested_idx); - - pcs_cs_cong_enqueue(ireq, csl->cs[congested_idx].cslink.cs); - return 0; + queued = pcs_cs_cong_enqueue_cond(ireq, congested_cs); + if (queued) + return 0; } - WRITE_ONCE(csl->cong_index, -1); - - if (allot < ireq->dentry->cluster->cfg.curr.lmss) - allot = ireq->dentry->cluster->cfg.curr.lmss; for (;;) { struct pcs_int_request * sreq = ireq; @@ -2048,61 +2101,55 @@ static int pcs_cslist_submit_write(struct pcs_int_request *ireq, struct pcs_cs_l } } -static int pcs_cslist_submit_flush(struct pcs_int_request *ireq, struct pcs_cs_list * csl, int requeue) +static int pcs_cslist_submit_flush(struct pcs_int_request *ireq, struct pcs_cs_list * csl) { struct pcs_cs * cs; int i; - int congested_idx; - int max_excess; int allot = PCS_CS_FLUSH_WEIGHT; struct pcs_msg * msg; struct pcs_cs_iohdr * ioh; + u64 congested = 0; + struct pcs_cs * congested_cs = NULL; - congested_idx = -1; - max_excess = 0; + if (csl->serno != ireq->tok_serno) + ireq->tok_reserved = 0; + BUG_ON(!list_empty(&ireq->tok_list)); for (i = 0; i < csl->nsrv; i++) { - int cs_allot; - cs = csl->cs[i].cslink.cs; if (cs_is_blacklisted(cs)) { map_remote_error(ireq->flushreq.map, cs->blacklist_reason, cs->id.val); TRACE("Flush to " MAP_FMT " blocked by blacklist error %d, CS" NODE_FMT, MAP_ARGS(ireq->flushreq.map), cs->blacklist_reason, NODE_ARGS(cs->id)); + spin_lock(&ireq->completion_data.child_lock); + ireq_drop_tokens(ireq); + spin_unlock(&ireq->completion_data.child_lock); return -1; } spin_lock(&cs->lock); cs_cwnd_use_or_lose(cs); - cs_allot = cs->eff_cwnd - cs->in_flight; spin_unlock(&cs->lock); - - if (cs_allot < 0) { - cs_allot = -cs_allot; - if (cs_allot > max_excess) { - congested_idx = i; - max_excess = cs_allot; - } - } + if (cs->in_flight > cs->eff_cwnd && !(ireq->tok_reserved & (1ULL << i))) { + congested_cs = cs; + congested |= (1ULL << i); + } else + ireq->tok_reserved |= (1ULL << i); } - if (congested_idx >= 0) { - int cur_cong_idx = READ_ONCE(csl->cong_index); + if (congested) { + int queued; - if (cur_cong_idx >= 0 && !requeue && - (READ_ONCE(csl->cs[cur_cong_idx].cslink.cs->cong_queue_len) || - READ_ONCE(csl->cs[cur_cong_idx].cslink.cs->active_list_len))) - congested_idx = cur_cong_idx; + ireq->tok_serno = csl->serno; + if (congested & (congested - 1)) + queued = ireq_queue_tokens(ireq, csl); else - WRITE_ONCE(csl->cong_index, congested_idx); - - pcs_cs_cong_enqueue(ireq, csl->cs[congested_idx].cslink.cs); - return 0; + queued = pcs_cs_cong_enqueue_cond(ireq, congested_cs); + if (queued) + return 0; } - WRITE_ONCE(csl->cong_index, -1); - for (i = 0; i < csl->nsrv; i++) { cs = csl->cs[i].cslink.cs; cs_increment_in_flight(cs, allot); @@ -2137,25 +2184,25 @@ static int pcs_cslist_submit_flush(struct pcs_int_request *ireq, struct pcs_cs_l -int pcs_cslist_submit(struct pcs_int_request *ireq, struct pcs_cs_list *csl, int requeue) +int pcs_cslist_submit(struct pcs_int_request *ireq, struct pcs_cs_list *csl) { BUG_ON(!atomic_read(&csl->refcnt)); if (ireq->type == PCS_IREQ_FLUSH) { - return pcs_cslist_submit_flush(ireq, csl, requeue); + return pcs_cslist_submit_flush(ireq, csl); } else if (!pcs_req_direction(ireq->iochunk.cmd)) { - return pcs_cslist_submit_read(ireq, csl, requeue); + return pcs_cslist_submit_read(ireq, csl); } else if (ireq->flags & IREQ_F_MAPPED) { BUG(); return -EIO; } else { - return pcs_cslist_submit_write(ireq, csl, requeue); + return pcs_cslist_submit_write(ireq, csl); } BUG(); return -EIO; } -void map_submit(struct pcs_map_entry * m, struct pcs_int_request *ireq, int requeue) +void map_submit(struct pcs_map_entry * m, struct pcs_int_request *ireq) { int direction; int done; @@ -2235,7 +2282,7 @@ void map_submit(struct pcs_map_entry * m, struct pcs_int_request *ireq, int requ if (direction && ireq->type != PCS_IREQ_FLUSH) ireq->dentry->local_mtime = get_real_time_ms(); - done = !pcs_cslist_submit(ireq, csl, requeue); + done = !pcs_cslist_submit(ireq, csl); cslist_put(csl); } while (!done); } @@ -2690,7 +2737,7 @@ void process_flush_req(struct pcs_int_request *ireq) goto done; } spin_unlock(&m->lock); - map_submit(m, ireq, 0); + map_submit(m, ireq); return; done: @@ -2888,6 +2935,8 @@ static int prepare_map_flush_ireq(struct pcs_map_entry *m, struct pcs_int_reques } prepare_map_flush_msg(m, sreq, msg); sreq->type = PCS_IREQ_FLUSH; + INIT_LIST_HEAD(&sreq->tok_list); + sreq->tok_reserved = 0; sreq->ts = ktime_get(); sreq->completion_data.parent = NULL; sreq->flushreq.map = m; @@ -2922,7 +2971,7 @@ static void sync_timer_work(struct work_struct *w) map_sync_work_add(m, HZ); } else { if (sreq) - map_submit(m, sreq, 0); + map_submit(m, sreq); } /* Counter part from map_sync_work_add */ pcs_map_put(m); diff --git a/fs/fuse/kio/pcs/pcs_map.h b/fs/fuse/kio/pcs/pcs_map.h index 11176b2b80d5..c60e72b365d3 100644 --- a/fs/fuse/kio/pcs/pcs_map.h +++ b/fs/fuse/kio/pcs/pcs_map.h @@ -85,13 +85,13 @@ struct pcs_cs_list atomic_t refcnt; atomic_t seq_read_in_flight; int read_index; /* volatile read hint */ - int cong_index; /* volatile cong hint */ unsigned long blacklist; /* Atomic bit field */ abs_time_t blacklist_expires; /* volatile blacklist stamp */ abs_time_t select_stamp; /* volatile read hint stamp */ /* members below are immutable accross cslist life time */ #define CSL_FL_HAS_LOCAL 1 unsigned int flags; + u64 serno; int read_timeout; int write_timeout; int nsrv; @@ -165,7 +165,7 @@ void pcs_mapping_truncate(struct pcs_int_request *ireq, u64 old_size); void process_ireq_truncate(struct pcs_int_request *ireq); struct pcs_map_entry * pcs_find_get_map(struct pcs_dentry_info * de, u64 chunk); -void map_submit(struct pcs_map_entry * m, struct pcs_int_request *ireq, int requeue); +void map_submit(struct pcs_map_entry * m, struct pcs_int_request *ireq); void map_notify_iochunk_error(struct pcs_int_request *ireq); void map_notify_soft_error(struct pcs_int_request *ireq); void __pcs_map_put(struct pcs_map_entry *m); @@ -182,7 +182,7 @@ void pcs_map_verify_sync_state(struct pcs_dentry_info * de, struct pcs_int_reque void map_inject_flush_req(struct pcs_int_request *ireq); void process_flush_req(struct pcs_int_request *ireq); int map_check_limit(struct pcs_map_entry * map, struct pcs_int_request *ireq); -int pcs_cslist_submit(struct pcs_int_request *ireq, struct pcs_cs_list *csl, int requeue); +int pcs_cslist_submit(struct pcs_int_request *ireq, struct pcs_cs_list *csl); struct pcs_int_request * pcs_ireq_split(struct pcs_int_request *ireq, unsigned int iochunk, int noalign); int fuse_map_resolve(struct pcs_map_entry * m, int direction); struct pcs_ioc_getmap; @@ -190,6 +190,7 @@ void pcs_map_complete(struct pcs_map_entry *m, struct pcs_ioc_getmap *omap); int pcs_map_encode_req(struct pcs_map_entry*m, struct pcs_ioc_getmap *map, int direction); void map_truncate_tail(struct pcs_mapping *mapping, u64 offset); unsigned long pcs_map_shrink_scan(struct shrinker *, struct shrink_control *sc); +void ireq_drop_tokens(struct pcs_int_request * ireq); #define MAP_FMT "(%p) 0x%lld s:%x" DENTRY_FMT #define MAP_ARGS(m) (m), (long long)(m)->index, (m)->state, DENTRY_ARGS(pcs_dentry_from_map((m))) diff --git a/fs/fuse/kio/pcs/pcs_req.h b/fs/fuse/kio/pcs/pcs_req.h index fba74e9c4a56..6f49018e3988 100644 --- a/fs/fuse/kio/pcs/pcs_req.h +++ b/fs/fuse/kio/pcs/pcs_req.h @@ -28,6 +28,8 @@ enum PCS_IREQ_CUSTOM = 16, /* generic request */ PCS_IREQ_WRAID = 17, /* compound raid6 write request */ PCS_IREQ_RRAID = 18, /* compound raid6 read request */ + PCS_IREQ_GETMAP = 19, /* get mapping for kdirect mode */ + PCS_IREQ_TOKEN = 20, /* dummy token to allocate congestion window */ PCS_IREQ_KAPI = 65 /* IO request from kernel API */ }; @@ -86,6 +88,13 @@ struct pcs_int_request */ struct work_struct worker; + /* The following tok_* fields are sequenced by completion_data.child_lock + * NOTE: cs->lock can be taken under this lock. + */ + struct list_head tok_list; + u64 tok_reserved; + u64 tok_serno; + union { struct { struct pcs_map_entry *map; @@ -112,6 +121,12 @@ struct pcs_int_request struct pcs_msg *msg; } flushreq; + struct { + struct pcs_int_request *parent; + struct list_head tok_link; + int cs_index; + } token; + struct { u64 offset; int phase; _______________________________________________ Devel mailing list Devel@openvz.org https://lists.openvz.org/mailman/listinfo/devel