The commit is pushed to "branch-rh7-3.10.0-693.21.1.vz7.50.x-ovz" and will 
appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh7-3.10.0-693.21.1.vz7.50.1
------>
commit 785731c50a55a452f5ee0aecc0fa438c7edfba59
Author: Alexey Kuznetsov <kuz...@virtuozzo.com>
Date:   Wed May 23 11:04:18 2018 +0300

    fuse kio_pcs: ports from new user-space
    
    New token-based congestion avoidance is ported to kernel.
    
    Also, some bugs in fuse_kio_pcs congestion avoidance are fixed.
    There was a race condition, when cwnd could go to 0 from another thread
    and congestion queue would stuck. Now we solve the race condition moving
    cwnd check under cs->lock. In case cs is not congested under cs->lock,
    request is submitted immediately.
    
    Signed-off-by: Alexey Kuznetsov <kuz...@virtuozzo.com>
    Signed-off-by: Kirill Tkhai <ktk...@virtuozzo.com>
---
 fs/fuse/kio/pcs/pcs_cluster.c      |  31 +++-
 fs/fuse/kio/pcs/pcs_cs.c           |  25 ++--
 fs/fuse/kio/pcs/pcs_cs.h           |  15 +-
 fs/fuse/kio/pcs/pcs_fuse_kdirect.c |  10 +-
 fs/fuse/kio/pcs/pcs_map.c          | 287 ++++++++++++++++++++++---------------
 fs/fuse/kio/pcs/pcs_map.h          |   7 +-
 fs/fuse/kio/pcs/pcs_req.h          |  15 ++
 7 files changed, 241 insertions(+), 149 deletions(-)

diff --git a/fs/fuse/kio/pcs/pcs_cluster.c b/fs/fuse/kio/pcs/pcs_cluster.c
index 79071655f6e5..24ec8a5f39a3 100644
--- a/fs/fuse/kio/pcs/pcs_cluster.c
+++ b/fs/fuse/kio/pcs/pcs_cluster.c
@@ -18,6 +18,7 @@
 #include "../../fuse_i.h"
 
 void pcs_cc_process_ireq_chunk(struct pcs_int_request *ireq);
+static void ireq_process_(struct pcs_int_request *ireq);
 
 static inline int is_file_inline(struct pcs_dentry_info *di)
 {
@@ -226,6 +227,9 @@ static int fiemap_worker(void * arg)
 
                sreq->dentry = di;
                sreq->type = PCS_IREQ_IOCHUNK;
+               INIT_LIST_HEAD(&sreq->tok_list);
+               sreq->tok_reserved = 0;
+               sreq->tok_serno = 0;
                sreq->iochunk.map = NULL;
                sreq->iochunk.flow = pcs_flow_record(&di->mapping.ftab, 0, pos, 
end-pos, &di->cluster->maps.ftab);
                sreq->iochunk.cmd = PCS_REQ_T_FIEMAP;
@@ -280,7 +284,7 @@ void pcs_cc_process_ireq_chunk(struct pcs_int_request *ireq)
                pcs_map_put(ireq->iochunk.map);
        ireq->iochunk.map = map;
 
-       map_submit(map, ireq, 0);
+       map_submit(map, ireq);
 }
 
 /* TODO Remove noinline in production */
@@ -325,6 +329,9 @@ static noinline void __pcs_cc_process_ireq_rw(struct 
pcs_int_request *ireq)
 
                sreq->dentry = di;
                sreq->type = PCS_IREQ_IOCHUNK;
+               INIT_LIST_HEAD(&sreq->tok_list);
+               sreq->tok_reserved = 0;
+               sreq->tok_serno = 0;
                sreq->iochunk.map = NULL;
                sreq->iochunk.flow = pcs_flow_get(fl);
                sreq->iochunk.cmd = ireq->apireq.req->type;
@@ -391,6 +398,25 @@ static void pcs_cc_process_ireq_ioreq(struct 
pcs_int_request *ireq)
        return __pcs_cc_process_ireq_rw(ireq);
 }
 
+static void process_ireq_token(struct pcs_int_request * ireq)
+{
+       struct pcs_int_request * parent = ireq->token.parent;
+
+        if (parent) {
+               int do_execute = 0;
+
+               spin_lock(&parent->completion_data.child_lock);
+               if (ireq->token.parent) {
+                       ireq_drop_tokens(parent);
+                       do_execute = 1;
+               }
+               spin_unlock(&parent->completion_data.child_lock);
+               if (do_execute)
+                       ireq_process_(parent);
+        }
+        ireq_destroy(ireq);
+}
+
 static void ireq_process_(struct pcs_int_request *ireq)
 {
        struct fuse_conn * fc = container_of(ireq->cc, struct pcs_fuse_cluster, 
cc)->fc;
@@ -425,6 +451,9 @@ static void ireq_process_(struct pcs_int_request *ireq)
        case PCS_IREQ_CUSTOM:
                ireq->custom.action(ireq);
                break;
+       case PCS_IREQ_TOKEN:
+               process_ireq_token(ireq);
+               break;
        default:
                BUG();
                break;
diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
index 151b06cb18cf..cae591e65a42 100644
--- a/fs/fuse/kio/pcs/pcs_cs.c
+++ b/fs/fuse/kio/pcs/pcs_cs.c
@@ -694,8 +694,8 @@ void pcs_cs_notify_error(struct pcs_cluster_core *cc, 
pcs_error_t *err)
        if (cs == NULL)
                return;
 
+       list_splice_tail_init(&cs->active_list, &queue);
        list_splice_tail_init(&cs->cong_queue, &queue);
-       clear_bit(CS_SF_CONGESTED, &cs->state);
        cs->cong_queue_len = 0;
        cs_blacklist(cs, err->value, "notify error");
        spin_unlock(&cs->lock);
@@ -710,9 +710,7 @@ static void pcs_cs_isolate(struct pcs_cs *cs, struct 
list_head *dispose)
 
        list_splice_tail_init(&cs->active_list, dispose);
        list_splice_tail_init(&cs->cong_queue, dispose);
-       cs->active_list_len = 0;
        cs->cong_queue_len = 0;
-       clear_bit(CS_SF_CONGESTED, &cs->state);
 
        cs->is_dead = 1;
        spin_lock(&cs->css->lock);
@@ -920,7 +918,7 @@ void cs_decrement_in_flight(struct pcs_cs *cs, unsigned int 
to_dec)
 
        if (cs->in_flight < cs->eff_cwnd) {
                cs->cwr_state = 0;
-               pcs_cs_flush_cong_queue(cs);
+               pcs_cs_activate_cong_queue(cs);
        }
        if (cs->in_flight == 0)
                cs->idle_stamp = jiffies;
@@ -1066,6 +1064,7 @@ void pcs_csset_init(struct pcs_cs_set *css)
        INIT_DELAYED_WORK(&css->bl_work, bl_timer_work);
        css->ncs = 0;
        spin_lock_init(&css->lock);
+       atomic64_set(&css->csl_serno_gen, 0);
 }
 
 void pcs_csset_fini(struct pcs_cs_set *css)
@@ -1146,14 +1145,18 @@ void pcs_cs_set_stat_up(struct pcs_cs_set *set)
        pcs_cs_for_each_entry(set, do_update_stat, 0);
 }
 
-void pcs_cs_cong_enqueue(struct pcs_int_request *ireq, struct pcs_cs *cs)
+int pcs_cs_cong_enqueue_cond(struct pcs_int_request *ireq, struct pcs_cs *cs)
 {
+       int queued = 0;
+
        spin_lock(&cs->lock);
-       if (test_bit(CS_SF_CONGESTED, &cs->state))
-               test_bit(CS_SF_CONGESTED, &cs->state);
-       list_add_tail(&ireq->list, &cs->cong_queue);
-       cs->cong_queue_len++;
-       if (!ireq->qdepth)
-               ireq->qdepth = cs->cong_queue_len + cs->active_list_len;
+       if (cs->in_flight >= cs->eff_cwnd) {
+               list_add_tail(&ireq->list, &cs->cong_queue);
+               cs->cong_queue_len++;
+               if (!ireq->qdepth)
+                       ireq->qdepth = cs->cong_queue_len;
+               queued = 1;
+       }
        spin_unlock(&cs->lock);
+       return queued;
 }
diff --git a/fs/fuse/kio/pcs/pcs_cs.h b/fs/fuse/kio/pcs/pcs_cs.h
index f46b31f2633d..eb81ac51f3ae 100644
--- a/fs/fuse/kio/pcs/pcs_cs.h
+++ b/fs/fuse/kio/pcs/pcs_cs.h
@@ -36,7 +36,6 @@ enum {
        CS_SF_FAILED,
        CS_SF_BLACKLISTED,
        CS_SF_ACTIVE,
-       CS_SF_CONGESTED,
 };
 
 struct pcs_cs {
@@ -66,14 +65,13 @@ struct pcs_cs {
        struct list_head        cong_queue;
        int                     cong_queue_len;
        struct list_head        active_list;
-       int                     active_list_len;
 
        pcs_cs_io_prio_t        io_prio;
        pcs_cs_net_prio_t       net_prio;
        u8                      mds_flags;
        abs_time_t              io_prio_stamp;
 
-       struct list_head                flow_lru;
+       struct list_head        flow_lru;
        int                     nflows;
 
        unsigned long           state;
@@ -104,24 +102,20 @@ static inline void pcs_cs_init_cong_queue(struct pcs_cs 
*cs)
 {
        INIT_LIST_HEAD(&cs->cong_queue);
        cs->cong_queue_len = 0;
-       clear_bit(CS_SF_CONGESTED, &cs->state);
 }
 
 static inline void pcs_cs_init_active_list(struct pcs_cs *cs)
 {
        INIT_LIST_HEAD(&cs->active_list);
-       cs->active_list_len = 0;
 }
 
-static inline void pcs_cs_flush_cong_queue(struct pcs_cs *cs)
+static inline void pcs_cs_activate_cong_queue(struct pcs_cs *cs)
 {
        assert_spin_locked(&cs->lock);
-       list_splice_tail(&cs->cong_queue, &cs->active_list);
-       cs->active_list_len += cs->cong_queue_len;
-       pcs_cs_init_cong_queue(cs);
+       list_splice_tail_init(&cs->cong_queue, &cs->active_list);
 }
 
-void pcs_cs_cong_enqueue(struct pcs_int_request *ireq, struct pcs_cs *cs);
+int pcs_cs_cong_enqueue_cond(struct pcs_int_request *ireq, struct pcs_cs *cs);
 
 #define PCS_CS_HASH_SIZE 1024
 
@@ -132,6 +126,7 @@ struct pcs_cs_set {
        struct delayed_work     bl_work;
        unsigned int            ncs;
        spinlock_t              lock;
+       atomic64_t              csl_serno_gen;
 };
 
 void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq);
diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c 
b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
index f4c0fb15403e..61378f0d9a58 100644
--- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
+++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
@@ -843,11 +843,6 @@ static void pcs_fuse_submit(struct pcs_fuse_cluster *pfc, 
struct fuse_req *req,
                if (inarg->offset >= di->fileinfo.attr.size)
                        inarg->mode &= ~FALLOC_FL_ZERO_RANGE;
 
-               if (inarg->mode & FALLOC_FL_KEEP_SIZE) {
-                       if (inarg->offset + inarg->length > 
di->fileinfo.attr.size)
-                               inarg->length = di->fileinfo.attr.size - 
inarg->offset;
-               }
-
                if (inarg->mode & (FALLOC_FL_ZERO_RANGE|FALLOC_FL_PUNCH_HOLE)) {
                        if ((inarg->offset & (PAGE_SIZE - 1)) || (inarg->length 
& (PAGE_SIZE - 1))) {
                                r->req.out.h.error = -EINVAL;
@@ -855,6 +850,11 @@ static void pcs_fuse_submit(struct pcs_fuse_cluster *pfc, 
struct fuse_req *req,
                        }
                }
 
+               if (inarg->mode & FALLOC_FL_KEEP_SIZE) {
+                       if (inarg->offset + inarg->length > 
di->fileinfo.attr.size)
+                               inarg->length = di->fileinfo.attr.size - 
inarg->offset;
+               }
+
                ret = pcs_fuse_prep_rw(r);
                if (!ret)
                        goto submit;
diff --git a/fs/fuse/kio/pcs/pcs_map.c b/fs/fuse/kio/pcs/pcs_map.c
index 085f49fd0219..90e311f7e995 100644
--- a/fs/fuse/kio/pcs/pcs_map.c
+++ b/fs/fuse/kio/pcs/pcs_map.c
@@ -911,8 +911,8 @@ struct pcs_cs_list* cslist_alloc( struct pcs_cs_set *css, 
struct pcs_cs_info *re
        atomic_set(&cs_list->refcnt, 1);
        atomic_set(&cs_list->seq_read_in_flight, 0);
        cs_list->read_index = -1;
-       cs_list->cong_index = -1 ;
        cs_list->flags = 0;
+       cs_list->serno = atomic64_inc_return(&css->csl_serno_gen);
        cs_list->blacklist = 0;
        cs_list->read_timeout = (read_tout * HZ) / 1000;
        cs_list->write_timeout = (write_tout * HZ) / 1000;
@@ -1380,7 +1380,7 @@ static void pcs_cs_deaccount(struct pcs_int_request 
*ireq, struct pcs_cs * cs, i
        spin_unlock(&cs->lock);
 }
 
-static void pcs_cs_wakeup(struct pcs_cs * cs, int requeue)
+static void pcs_cs_wakeup(struct pcs_cs * cs)
 {
        struct pcs_int_request * sreq;
        struct pcs_map_entry * map;
@@ -1393,11 +1393,32 @@ static void pcs_cs_wakeup(struct pcs_cs * cs, int 
requeue)
                        break;
                }
                sreq = list_first_entry(&cs->active_list, struct 
pcs_int_request, list);
-               BUG_ON(!cs->active_list_len);
                list_del_init(&sreq->list);
-               cs->active_list_len--;
+               cs->cong_queue_len--;
                spin_unlock(&cs->lock);
 
+               if (sreq->type == PCS_IREQ_TOKEN) {
+                       struct pcs_int_request * parent = sreq->token.parent;
+                       int do_execute = 0;
+
+                       if (parent == NULL) {
+                               ireq_destroy(sreq);
+                               continue;
+                       }
+
+                       spin_lock(&parent->completion_data.child_lock);
+                       if (sreq->token.parent) {
+                               parent->tok_reserved |= (1ULL << 
sreq->token.cs_index);
+                               list_del(&sreq->token.tok_link);
+                               do_execute = list_empty(&parent->tok_list);
+                       }
+                       spin_unlock(&parent->completion_data.child_lock);
+                       ireq_destroy(sreq);
+                       if (!do_execute)
+                               continue;
+                       sreq = parent;
+               }
+
                if (sreq->type != PCS_IREQ_FLUSH) {
                        map = pcs_find_get_map(sreq->dentry, 
sreq->iochunk.chunk +
                                                   ((sreq->flags & 
IREQ_F_MAPPED) ? 0 : sreq->iochunk.offset));
@@ -1412,7 +1433,7 @@ static void pcs_cs_wakeup(struct pcs_cs * cs, int requeue)
                                                         preq->apireq.req->pos, 
preq->apireq.req->size,
                                                         &sreq->cc->maps.ftab);
                                }
-                               map_submit(map, sreq, requeue);
+                               map_submit(map, sreq);
                        } else {
                                map_queue_on_limit(sreq);
                        }
@@ -1422,58 +1443,33 @@ static void pcs_cs_wakeup(struct pcs_cs * cs, int 
requeue)
                                pcs_clear_error(&sreq->error);
                                ireq_complete(sreq);
                        } else
-                               map_submit(map, sreq, requeue);
+                               map_submit(map, sreq);
                }
        }
 }
 
 static int __pcs_cs_still_congested(struct pcs_cs * cs)
 {
+       if (!list_empty(&cs->active_list))
+               list_splice_tail_init(&cs->active_list, &cs->cong_queue);
 
-       assert_spin_locked(&cs->lock);
-
-       if (!list_empty(&cs->active_list)) {
-               BUG_ON(!cs->active_list_len);
-               list_splice_tail(&cs->active_list, &cs->cong_queue);
-               cs->cong_queue_len += cs->active_list_len;
-               set_bit(CS_SF_CONGESTED, &cs->state);
-               pcs_cs_init_active_list(cs);
-       } else if (list_empty(&cs->cong_queue)) {
+       if (list_empty(&cs->cong_queue)) {
                BUG_ON(cs->cong_queue_len);
-               BUG_ON(test_bit(CS_SF_CONGESTED, &cs->state));
                return 0;
-       } else {
-               BUG_ON(cs->active_list_len);
        }
 
-       if (cs->in_flight >= cs->eff_cwnd)
-               return 0;
-
-       /* Exceptional situation: CS is not congested, but still has congestion 
queue.
-        * This can happen f.e. when CS was congested with reads and has some 
writes in queue,
-        * then all reads are complete, but writes cannot be sent because of 
congestion
-        * on another CSes in chain. This is absolutely normal, we just should 
queue
-        * not on this CS, but on actualle congested CSes. With current 
algorithm of preventing
-        * reordering, we did a mistake and queued on node which used to be 
congested.
-        * Solution for now is to retry sending with flag "requeue" set, it 
will requeue
-        * requests on another nodes. It is difficult to say how frequently 
this happens,
-        * so we spit out message. If we will have lots of them in logs, we 
have to select
-        * different solution.
-        */
-
-       TRACE("CS#" NODE_FMT " is free, but still has queue", 
NODE_ARGS(cs->id));
-       pcs_cs_flush_cong_queue(cs);
-
-       return 1;
+       return cs->in_flight < cs->eff_cwnd;
 }
+
 static int pcs_cs_still_congested(struct pcs_cs * cs)
 {
-       int ret;
+       int res;
 
        spin_lock(&cs->lock);
-       ret = __pcs_cs_still_congested(cs);
+       res = __pcs_cs_still_congested(cs);
        spin_unlock(&cs->lock);
-       return ret;
+
+       return res;
 }
 
 void pcs_deaccount_ireq(struct pcs_int_request *ireq, pcs_error_t * err)
@@ -1518,7 +1514,7 @@ void pcs_deaccount_ireq(struct pcs_int_request *ireq, 
pcs_error_t * err)
 
        if (ireq->type == PCS_IREQ_FLUSH || 
(pcs_req_direction(ireq->iochunk.cmd) && !(ireq->flags & IREQ_F_MAPPED))) {
                int i;
-               int requeue = 0;
+               int requeue;
 
                for (i = csl->nsrv - 1; i >= 0; i--) {
                        if (!match_id || csl->cs[i].cslink.cs->id.val == 
match_id)
@@ -1538,14 +1534,14 @@ void pcs_deaccount_ireq(struct pcs_int_request *ireq, 
pcs_error_t * err)
 
                do {
                        for (i = csl->nsrv - 1; i >= 0; i--)
-                               pcs_cs_wakeup(csl->cs[i].cslink.cs, requeue);
+                               pcs_cs_wakeup(csl->cs[i].cslink.cs);
 
                        requeue = 0;
                        for (i = csl->nsrv - 1; i >= 0; i--)
-                               requeue += 
pcs_cs_still_congested(csl->cs[i].cslink.cs);
+                               requeue |= 
pcs_cs_still_congested(csl->cs[i].cslink.cs);
                } while (requeue);
        } else {
-               int requeue = 0;
+               int requeue;
                struct pcs_cs * rcs = csl->cs[ireq->iochunk.cs_index].cslink.cs;
 
                if (ireq->flags & IREQ_F_SEQ_READ) {
@@ -1557,7 +1553,7 @@ void pcs_deaccount_ireq(struct pcs_int_request *ireq, 
pcs_error_t * err)
                pcs_cs_deaccount(ireq, rcs, error);
 
                do {
-                       pcs_cs_wakeup(rcs, requeue);
+                       pcs_cs_wakeup(rcs);
 
                        requeue = pcs_cs_still_congested(rcs);
                } while (requeue);
@@ -1771,6 +1767,10 @@ pcs_ireq_split(struct pcs_int_request *ireq, unsigned 
int iochunk, int noalign)
        sreq->iochunk.map = ireq->iochunk.map;
        if (sreq->iochunk.map)
                __pcs_map_get(sreq->iochunk.map);
+       INIT_LIST_HEAD(&sreq->tok_list);
+       BUG_ON(!list_empty(&ireq->tok_list));
+       sreq->tok_reserved = ireq->tok_reserved;
+       sreq->tok_serno = ireq->tok_serno;
        sreq->iochunk.flow = pcs_flow_get(ireq->iochunk.flow);
        sreq->iochunk.cmd = ireq->iochunk.cmd;
        sreq->iochunk.role = ireq->iochunk.role;
@@ -1803,7 +1803,7 @@ pcs_ireq_split(struct pcs_int_request *ireq, unsigned int 
iochunk, int noalign)
        return sreq;
 }
 
-static int pcs_cslist_submit_read(struct pcs_int_request *ireq, struct 
pcs_cs_list * csl, int requeue)
+static int pcs_cslist_submit_read(struct pcs_int_request *ireq, struct 
pcs_cs_list * csl)
 {
        struct pcs_cluster_core *cc = ireq->cc;
        struct pcs_cs * cs;
@@ -1872,9 +1872,8 @@ static int pcs_cslist_submit_read(struct pcs_int_request 
*ireq, struct pcs_cs_li
        spin_unlock(&cs->lock);
 
        if (allot < 0) {
-               pcs_cs_cong_enqueue(ireq, cs);
-
-               return 0;
+               if (pcs_cs_cong_enqueue_cond(ireq, cs))
+                       return 0;
        }
 
        if (allot < ireq->dentry->cluster->cfg.curr.lmss)
@@ -1924,77 +1923,131 @@ static int pcs_cslist_submit_read(struct 
pcs_int_request *ireq, struct pcs_cs_li
                        return 0;
 
                if (allot < 0) {
-                       pcs_cs_cong_enqueue(ireq, cs);
-                       return 0;
+                       if (pcs_cs_cong_enqueue_cond(ireq, cs))
+                               return 0;
                }
        }
 }
 
-static int pcs_cslist_submit_write(struct pcs_int_request *ireq, struct 
pcs_cs_list * csl, int requeue)
+static int ireq_queue_tokens(struct pcs_int_request * ireq, struct pcs_cs_list 
* csl)
+{
+       int i;
+       int queued = 0;
+       struct list_head drop;
+       struct pcs_int_request * toks[csl->nsrv];
+
+       INIT_LIST_HEAD(&drop);
+
+       for (i = 0; i < csl->nsrv; i++) {
+               struct pcs_int_request * ntok;
+
+              /* ireq is private; no need to lock tok_* fields */
+
+               if (ireq->tok_reserved & (1ULL << i)) {
+                      toks[i] = NULL;
+                       continue;
+              }
+
+               ntok = ireq_alloc(ireq->dentry);
+               BUG_ON(!ntok);
+               ntok->type = PCS_IREQ_TOKEN;
+               ntok->token.parent = ireq;
+               ntok->token.cs_index = i;
+              toks[i] = ntok;
+       }
+
+       /* Publish tokens in CS queues */
+       spin_lock(&ireq->completion_data.child_lock);
+       for (i = 0; i < csl->nsrv; i++) {
+              if (toks[i]) {
+                      struct pcs_cs * cs = csl->cs[i].cslink.cs;
+                      if (pcs_cs_cong_enqueue_cond(toks[i], cs)) {
+                              list_add(&toks[i]->token.tok_link, 
&ireq->tok_list);
+                              toks[i] = NULL;
+                              queued = 1;
+                      } else {
+                              list_add(&toks[i]->token.tok_link, &drop);
+                      }
+              }
+       }
+       spin_unlock(&ireq->completion_data.child_lock);
+
+       while (!list_empty(&drop)) {
+              struct pcs_int_request * tok = list_first_entry(&drop, struct 
pcs_int_request, token.tok_link);
+              list_del(&tok->token.tok_link);
+              ireq_destroy(tok);
+       }
+       return queued;
+}
+
+void ireq_drop_tokens(struct pcs_int_request * ireq)
+{
+       assert_spin_locked(&ireq->completion_data.child_lock);
+
+       while (!list_empty(&ireq->tok_list)) {
+               struct pcs_int_request * tok = 
list_first_entry(&ireq->tok_list, struct pcs_int_request, token.tok_link);
+               tok->token.parent = NULL;
+               list_del(&tok->token.tok_link);
+        }
+}
+
+static int pcs_cslist_submit_write(struct pcs_int_request *ireq, struct 
pcs_cs_list * csl)
 {
        struct pcs_cs * cs;
        unsigned int iochunk;
        int i;
-       int congested_idx;
-       int max_excess;
        int allot;
+       struct pcs_cs * congested_cs = NULL;
+       u64 congested = 0;
 
        ireq->iochunk.cs_index = 0;
        iochunk = ireq->dentry->cluster->cfg.curr.lmss;
 
 restart:
-       congested_idx = -1;
-       max_excess = 0;
        allot = ireq->iochunk.size;
+       if (csl->serno != ireq->tok_serno)
+               ireq->tok_reserved = 0;
+       BUG_ON(!list_empty(&ireq->tok_list));
 
        for (i = 0; i < csl->nsrv; i++) {
-               int cs_allot;
-
                cs = csl->cs[i].cslink.cs;
                if (cs_is_blacklisted(cs)) {
                        map_remote_error(ireq->iochunk.map, 
cs->blacklist_reason, cs->id.val);
                        TRACE("Write to " MAP_FMT " blocked by blacklist error 
%d, CS" NODE_FMT,
                              MAP_ARGS(ireq->iochunk.map), 
cs->blacklist_reason, NODE_ARGS(cs->id));
+                       spin_lock(&ireq->completion_data.child_lock);
+                       ireq_drop_tokens(ireq);
+                       spin_unlock(&ireq->completion_data.child_lock);
                        return -1;
                }
                spin_lock(&cs->lock);
                cs_cwnd_use_or_lose(cs);
-               cs_allot = cs->eff_cwnd - cs->in_flight;
                spin_unlock(&cs->lock);
 
-               if (cs_allot < 0) {
-                       cs_allot = -cs_allot;
-                       if (cs_allot > max_excess) {
-                               congested_idx = i;
-                               max_excess = cs_allot;
-                       }
-               } else {
-                       if (cs_allot < allot)
-                               allot = cs_allot;
-               }
+               if (cs->in_flight > cs->eff_cwnd && !(ireq->tok_reserved & 
(1ULL << i))) {
+                       congested_cs = cs;
+                       congested |= (1ULL << i);
+               } else
+                       ireq->tok_reserved |= (1ULL << i);
 
                if (!(test_bit(CS_SF_LOCAL, &cs->state)))
                        iochunk = ireq->dentry->cluster->cfg.curr.wmss;
        }
 
-       if (congested_idx >= 0) {
-               int cur_cong_idx = READ_ONCE(csl->cong_index);
+       if (allot < ireq->dentry->cluster->cfg.curr.lmss)
+               allot = ireq->dentry->cluster->cfg.curr.lmss;
 
+       if (congested) {
+               int queued;
 
-               if (cur_cong_idx >= 0 && !requeue &&
-                   (READ_ONCE(csl->cs[cur_cong_idx].cslink.cs->cong_queue_len) 
||
-                    
READ_ONCE(csl->cs[cur_cong_idx].cslink.cs->active_list_len)))
-                       congested_idx = cur_cong_idx;
+               ireq->tok_serno = csl->serno;
+               if (congested & (congested - 1))
+                       queued = ireq_queue_tokens(ireq, csl);
                else
-                       WRITE_ONCE(csl->cong_index, congested_idx);
-
-               pcs_cs_cong_enqueue(ireq, csl->cs[congested_idx].cslink.cs);
-               return 0;
+                       queued = pcs_cs_cong_enqueue_cond(ireq, congested_cs);
+               if (queued)
+                       return 0;
        }
-       WRITE_ONCE(csl->cong_index, -1);
-
-       if (allot < ireq->dentry->cluster->cfg.curr.lmss)
-               allot = ireq->dentry->cluster->cfg.curr.lmss;
 
        for (;;) {
                struct pcs_int_request * sreq = ireq;
@@ -2048,61 +2101,55 @@ static int pcs_cslist_submit_write(struct 
pcs_int_request *ireq, struct pcs_cs_l
        }
 }
 
-static int pcs_cslist_submit_flush(struct pcs_int_request *ireq, struct 
pcs_cs_list * csl, int requeue)
+static int pcs_cslist_submit_flush(struct pcs_int_request *ireq, struct 
pcs_cs_list * csl)
 {
        struct pcs_cs * cs;
        int i;
-       int congested_idx;
-       int max_excess;
        int allot = PCS_CS_FLUSH_WEIGHT;
        struct pcs_msg * msg;
        struct pcs_cs_iohdr * ioh;
+       u64 congested = 0;
+       struct pcs_cs * congested_cs = NULL;
 
-       congested_idx = -1;
-       max_excess = 0;
+       if (csl->serno != ireq->tok_serno)
+               ireq->tok_reserved = 0;
+       BUG_ON(!list_empty(&ireq->tok_list));
 
        for (i = 0; i < csl->nsrv; i++) {
-               int cs_allot;
-
                cs = csl->cs[i].cslink.cs;
 
                if (cs_is_blacklisted(cs)) {
                        map_remote_error(ireq->flushreq.map, 
cs->blacklist_reason, cs->id.val);
                        TRACE("Flush to " MAP_FMT " blocked by blacklist error 
%d, CS" NODE_FMT,
                              MAP_ARGS(ireq->flushreq.map), 
cs->blacklist_reason, NODE_ARGS(cs->id));
+                       spin_lock(&ireq->completion_data.child_lock);
+                       ireq_drop_tokens(ireq);
+                       spin_unlock(&ireq->completion_data.child_lock);
                        return -1;
                }
 
                spin_lock(&cs->lock);
                cs_cwnd_use_or_lose(cs);
-               cs_allot = cs->eff_cwnd - cs->in_flight;
                spin_unlock(&cs->lock);
-
-               if (cs_allot < 0) {
-                       cs_allot = -cs_allot;
-                       if (cs_allot > max_excess) {
-                               congested_idx = i;
-                               max_excess = cs_allot;
-                       }
-               }
+               if (cs->in_flight > cs->eff_cwnd && !(ireq->tok_reserved & 
(1ULL << i))) {
+                       congested_cs = cs;
+                       congested |= (1ULL << i);
+               } else
+                       ireq->tok_reserved |= (1ULL << i);
        }
 
-       if (congested_idx >= 0) {
-               int cur_cong_idx = READ_ONCE(csl->cong_index);
+       if (congested) {
+               int queued;
 
-               if (cur_cong_idx >= 0 && !requeue &&
-                   (READ_ONCE(csl->cs[cur_cong_idx].cslink.cs->cong_queue_len) 
||
-                    
READ_ONCE(csl->cs[cur_cong_idx].cslink.cs->active_list_len)))
-                       congested_idx = cur_cong_idx;
+               ireq->tok_serno = csl->serno;
+               if (congested & (congested - 1))
+                       queued = ireq_queue_tokens(ireq, csl);
                else
-                       WRITE_ONCE(csl->cong_index, congested_idx);
-
-               pcs_cs_cong_enqueue(ireq, csl->cs[congested_idx].cslink.cs);
-               return 0;
+                       queued = pcs_cs_cong_enqueue_cond(ireq, congested_cs);
+               if (queued)
+                       return 0;
        }
 
-       WRITE_ONCE(csl->cong_index, -1);
-
        for (i = 0; i < csl->nsrv; i++) {
                cs = csl->cs[i].cslink.cs;
                cs_increment_in_flight(cs, allot);
@@ -2137,25 +2184,25 @@ static int pcs_cslist_submit_flush(struct 
pcs_int_request *ireq, struct pcs_cs_l
 
 
 
-int pcs_cslist_submit(struct pcs_int_request *ireq, struct pcs_cs_list *csl, 
int requeue)
+int pcs_cslist_submit(struct pcs_int_request *ireq, struct pcs_cs_list *csl)
 {
        BUG_ON(!atomic_read(&csl->refcnt));
 
        if (ireq->type == PCS_IREQ_FLUSH) {
-               return pcs_cslist_submit_flush(ireq, csl, requeue);
+               return pcs_cslist_submit_flush(ireq, csl);
        } else if (!pcs_req_direction(ireq->iochunk.cmd)) {
-               return pcs_cslist_submit_read(ireq, csl, requeue);
+               return pcs_cslist_submit_read(ireq, csl);
        } else if (ireq->flags & IREQ_F_MAPPED) {
                BUG();
                return -EIO;
        } else {
-               return pcs_cslist_submit_write(ireq, csl, requeue);
+               return pcs_cslist_submit_write(ireq, csl);
        }
        BUG();
        return -EIO;
 }
 
-void map_submit(struct pcs_map_entry * m, struct pcs_int_request *ireq, int 
requeue)
+void map_submit(struct pcs_map_entry * m, struct pcs_int_request *ireq)
 {
        int direction;
        int done;
@@ -2235,7 +2282,7 @@ void map_submit(struct pcs_map_entry * m, struct 
pcs_int_request *ireq, int requ
                if (direction && ireq->type != PCS_IREQ_FLUSH)
                        ireq->dentry->local_mtime = get_real_time_ms();
 
-               done = !pcs_cslist_submit(ireq, csl, requeue);
+               done = !pcs_cslist_submit(ireq, csl);
                cslist_put(csl);
        } while (!done);
 }
@@ -2690,7 +2737,7 @@ void process_flush_req(struct pcs_int_request *ireq)
                goto done;
        }
        spin_unlock(&m->lock);
-       map_submit(m, ireq, 0);
+       map_submit(m, ireq);
        return;
 
 done:
@@ -2888,6 +2935,8 @@ static int prepare_map_flush_ireq(struct pcs_map_entry 
*m, struct pcs_int_reques
        }
        prepare_map_flush_msg(m, sreq, msg);
        sreq->type = PCS_IREQ_FLUSH;
+       INIT_LIST_HEAD(&sreq->tok_list);
+       sreq->tok_reserved = 0;
        sreq->ts = ktime_get();
        sreq->completion_data.parent = NULL;
        sreq->flushreq.map = m;
@@ -2922,7 +2971,7 @@ static void sync_timer_work(struct work_struct *w)
                map_sync_work_add(m, HZ);
        } else {
                if (sreq)
-                       map_submit(m, sreq, 0);
+                       map_submit(m, sreq);
        }
        /* Counter part from map_sync_work_add */
        pcs_map_put(m);
diff --git a/fs/fuse/kio/pcs/pcs_map.h b/fs/fuse/kio/pcs/pcs_map.h
index 11176b2b80d5..c60e72b365d3 100644
--- a/fs/fuse/kio/pcs/pcs_map.h
+++ b/fs/fuse/kio/pcs/pcs_map.h
@@ -85,13 +85,13 @@ struct pcs_cs_list
        atomic_t                refcnt;
        atomic_t                seq_read_in_flight;
        int                     read_index;             /* volatile read hint */
-       int                     cong_index;             /* volatile cong hint */
        unsigned long           blacklist;              /* Atomic bit field */
        abs_time_t              blacklist_expires;      /* volatile blacklist 
stamp */
        abs_time_t              select_stamp;           /* volatile read hint 
stamp */
        /* members below are immutable accross cslist life time */
 #define CSL_FL_HAS_LOCAL       1
        unsigned int            flags;
+       u64                     serno;
        int                     read_timeout;
        int                     write_timeout;
        int                     nsrv;
@@ -165,7 +165,7 @@ void pcs_mapping_truncate(struct pcs_int_request *ireq, u64 
old_size);
 void process_ireq_truncate(struct pcs_int_request *ireq);
 
 struct pcs_map_entry * pcs_find_get_map(struct pcs_dentry_info * de, u64 
chunk);
-void map_submit(struct pcs_map_entry * m, struct pcs_int_request *ireq, int 
requeue);
+void map_submit(struct pcs_map_entry * m, struct pcs_int_request *ireq);
 void map_notify_iochunk_error(struct pcs_int_request *ireq);
 void map_notify_soft_error(struct pcs_int_request *ireq);
 void __pcs_map_put(struct pcs_map_entry *m);
@@ -182,7 +182,7 @@ void pcs_map_verify_sync_state(struct pcs_dentry_info * de, 
struct pcs_int_reque
 void map_inject_flush_req(struct pcs_int_request *ireq);
 void process_flush_req(struct pcs_int_request *ireq);
 int map_check_limit(struct pcs_map_entry * map, struct pcs_int_request *ireq);
-int pcs_cslist_submit(struct pcs_int_request *ireq, struct pcs_cs_list *csl, 
int requeue);
+int pcs_cslist_submit(struct pcs_int_request *ireq, struct pcs_cs_list *csl);
 struct pcs_int_request * pcs_ireq_split(struct pcs_int_request *ireq, unsigned 
int iochunk, int noalign);
 int  fuse_map_resolve(struct pcs_map_entry * m, int direction);
 struct pcs_ioc_getmap;
@@ -190,6 +190,7 @@ void pcs_map_complete(struct pcs_map_entry *m, struct 
pcs_ioc_getmap *omap);
 int pcs_map_encode_req(struct pcs_map_entry*m, struct pcs_ioc_getmap *map, int 
direction);
 void map_truncate_tail(struct pcs_mapping *mapping, u64 offset);
 unsigned long pcs_map_shrink_scan(struct shrinker *,  struct shrink_control 
*sc);
+void ireq_drop_tokens(struct pcs_int_request * ireq);
 
 #define MAP_FMT        "(%p) 0x%lld s:%x" DENTRY_FMT
 #define MAP_ARGS(m) (m), (long long)(m)->index,         (m)->state, 
DENTRY_ARGS(pcs_dentry_from_map((m)))
diff --git a/fs/fuse/kio/pcs/pcs_req.h b/fs/fuse/kio/pcs/pcs_req.h
index fba74e9c4a56..6f49018e3988 100644
--- a/fs/fuse/kio/pcs/pcs_req.h
+++ b/fs/fuse/kio/pcs/pcs_req.h
@@ -28,6 +28,8 @@ enum
        PCS_IREQ_CUSTOM = 16,   /* generic request */
        PCS_IREQ_WRAID  = 17,   /* compound raid6 write request */
        PCS_IREQ_RRAID  = 18,   /* compound raid6 read request */
+       PCS_IREQ_GETMAP = 19,   /* get mapping for kdirect mode */
+       PCS_IREQ_TOKEN  = 20,   /* dummy token to allocate congestion window */
        PCS_IREQ_KAPI   = 65    /* IO request from kernel API */
 };
 
@@ -86,6 +88,13 @@ struct pcs_int_request
        */
        struct work_struct worker;
 
+       /* The following tok_* fields are sequenced by 
completion_data.child_lock
+        * NOTE: cs->lock can be taken under this lock.
+        */
+       struct list_head        tok_list;
+       u64                     tok_reserved;
+       u64                     tok_serno;
+
        union {
                struct {
                        struct pcs_map_entry    *map;
@@ -112,6 +121,12 @@ struct pcs_int_request
                        struct pcs_msg          *msg;
                } flushreq;
 
+               struct {
+                       struct pcs_int_request  *parent;
+                       struct list_head        tok_link;
+                       int                     cs_index;
+               } token;
+
                struct {
                        u64                     offset;
                        int                     phase;
_______________________________________________
Devel mailing list
Devel@openvz.org
https://lists.openvz.org/mailman/listinfo/devel

Reply via email to