Module parameter cs_enable_fanout=1 is required.

In case when this is legitimate writes are done not to chain of CSes,
but to each of CSes point to point. Especially nicely it works with
kernel accelerated CSes, which already was partly fanout.

This can be useful when network is very fast and we have no problems
sending 3 requests instead of one over the same network. Also it decreases
load to CSes, they do not need to send along the chain. Yet it increases
load at client. Whether do we get a profit - it depends.

Signed-off-by: Alexey Kuznetsov <kuz...@acronis.com>
---
 fs/fuse/kio/pcs/pcs_cs.c         | 214 ++++++++++++++++++++++++++++++++++-----
 fs/fuse/kio/pcs/pcs_cs.h         |   1 +
 fs/fuse/kio/pcs/pcs_cs_accel.c   |  61 +++++++----
 fs/fuse/kio/pcs/pcs_cs_prot.h    |  14 ++-
 fs/fuse/kio/pcs/pcs_map.c        |   5 +
 fs/fuse/kio/pcs/pcs_map.h        |   1 +
 fs/fuse/kio/pcs/pcs_prot_types.h |   1 +
 fs/fuse/kio/pcs/pcs_req.h        |  14 ++-
 8 files changed, 261 insertions(+), 50 deletions(-)

diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
index fffbce6..59b752d 100644
--- a/fs/fuse/kio/pcs/pcs_cs.c
+++ b/fs/fuse/kio/pcs/pcs_cs.c
@@ -292,7 +292,22 @@ void cs_log_io_times(struct pcs_int_request * ireq, struct 
pcs_msg * resp, unsig
        int reqt = h->hdr.type != PCS_CS_SYNC_RESP ? ireq->iochunk.cmd : 
PCS_REQ_T_SYNC;
 
        if (ireq->iochunk.parent_N && h->hdr.type != PCS_CS_READ_RESP && 
h->hdr.type != PCS_CS_FIEMAP_RESP) {
-               pcs_csa_relay_iotimes(ireq->iochunk.parent_N, h, 
resp->rpc->peer_id);
+               if (!(ireq->flags & IREQ_F_FANOUT)) {
+                       pcs_csa_relay_iotimes(ireq->iochunk.parent_N, h, 
resp->rpc->peer_id);
+               } else {
+                       int idx = ireq->iochunk.cs_index;
+                       struct pcs_int_request * parent = 
ireq->iochunk.parent_N;
+
+                       parent->iochunk.fo.io_times[idx].csid = 
resp->rpc->peer_id.val;
+                       /* XXX kio does not implement flow detection (for now) 
and does
+                        * not use flag PCS_CS_IO_SEQ. So, use it here to 
indicate
+                        * performed fanout.
+                        */
+                       parent->iochunk.fo.io_times[idx].misc = h->sync.misc | 
PCS_CS_IO_SEQ;
+                       parent->iochunk.fo.io_times[idx].ts_net = 
h->sync.ts_net;
+                       parent->iochunk.fo.io_times[idx].ts_io = h->sync.ts_io;
+
+               }
                return;
        }
 
@@ -605,41 +620,105 @@ static void cs_sent(struct pcs_msg *msg)
        pcs_rpc_sent(msg);
 }
 
-void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
+static void ireq_complete_fo(struct pcs_int_request * ireq)
 {
-       struct pcs_msg *msg = &ireq->iochunk.msg;
-       struct pcs_cs_iohdr *ioh;
-       struct pcs_cs_list *csl = ireq->iochunk.csl;
-       struct pcs_map_entry *map = ireq->iochunk.map; /* ireq keeps reference 
to map */
-       int storage_version = atomic_read(&ireq->cc->storage_version);
-       int aligned_msg;
+       if (!atomic_dec_and_test(&ireq->iochunk.fo.iocount))
+               return;
 
-       BUG_ON(msg->rpc);
+       if (pcs_if_error(&ireq->error)) {
+               FUSE_KTRACE(ireq->cc->fc, "IO error %d %lu, ireq:%p : 
%llu:%u+%u",
+                     ireq->error.value,
+                     ireq->error.remote ? (unsigned 
long)ireq->error.offender.val : 0UL,
+                     ireq, (unsigned long long)ireq->iochunk.chunk,
+                     (unsigned)ireq->iochunk.offset,
+                     (unsigned)ireq->iochunk.size);
+       } else if (ireq->iochunk.parent_N) {
+               struct pcs_int_request * parent = ireq->iochunk.parent_N;
+               int n = ireq->iochunk.fo.num_iotimes;
+               int idx = ireq->iochunk.cs_index;
 
-       ireq->ts_sent = ktime_get();
+               if (idx < parent->iochunk.acr.num_awr)
+                       idx = parent->iochunk.acr.num_awr;
 
-       if (!((ireq->iochunk.size|ireq->iochunk.offset) & 511) && !(ireq->flags 
& IREQ_F_NO_ACCEL)) {
-               if (ireq->iochunk.cmd == PCS_REQ_T_READ) {
-                       if (pcs_csa_cs_submit(cs, ireq))
-                               return;
-               } else if (ireq->iochunk.cmd == PCS_REQ_T_WRITE) {
-                       /* Synchronous writes in accel mode are still not 
supported */
-                       if (!(ireq->dentry->fileinfo.attr.attrib & 
PCS_FATTR_IMMEDIATE_WRITE) &&
-                           !ireq->dentry->no_write_delay) {
-                               struct pcs_int_request * sreq;
+               if (n > PCS_MAX_ACCEL_CS)
+                       n = PCS_MAX_ACCEL_CS;
 
-                               sreq = pcs_csa_csl_write_submit(ireq);
-                               if (!sreq)
-                                       return;
-                               if (sreq != ireq) {
-                                       ireq = sreq;
-                                       cs = 
ireq->iochunk.csl->cs[ireq->iochunk.cs_index].cslink.cs;
-                                       msg = &ireq->iochunk.msg;
+               if (n > idx) {
+                       memcpy(&parent->iochunk.acr.io_times[idx], 
&ireq->iochunk.fo.io_times[idx],
+                              (n - idx) * sizeof(struct fuse_tr_iotimes_cs));
+               }
+               parent->iochunk.acr.num_iotimes = n;
+       } else {
+               struct fuse_conn * fc = container_of(ireq->cc, struct 
pcs_fuse_cluster, cc)->fc;
+
+               fuse_stat_observe(fc, PCS_REQ_T_WRITE, ktime_sub(ktime_get(), 
ireq->ts_sent));
+
+               if (fc->ktrace && fc->ktrace_level >= LOG_TRACE) {
+                       struct fuse_trace_hdr * t;
+                       int n = ireq->iochunk.fo.num_iotimes;
+
+                       t = FUSE_TRACE_PREPARE(fc->ktrace, FUSE_KTRACE_IOTIMES, 
sizeof(struct fuse_tr_iotimes_hdr) +
+                                              n*sizeof(struct 
fuse_tr_iotimes_cs));
+                       if (t) {
+                               struct fuse_tr_iotimes_hdr * th = (struct 
fuse_tr_iotimes_hdr *)(t + 1);
+                               struct fuse_tr_iotimes_cs * ch = (struct 
fuse_tr_iotimes_cs *)(th + 1);
+                               int i;
+
+                               th->chunk = ireq->iochunk.chunk;
+                               th->offset = ireq->iochunk.chunk + 
ireq->iochunk.offset;
+                               th->size = ireq->iochunk.size;
+                               th->start_time = ktime_to_us(ireq->ts);
+                               th->local_delay = 
ktime_to_us(ktime_sub(ireq->ts_sent, ireq->ts));
+                               th->lat = t->time - ktime_to_us(ireq->ts_sent);
+                               th->ino = ireq->dentry->fileinfo.attr.id;
+                               th->type = PCS_CS_WRITE_AL_RESP;
+                               th->cses = n;
+
+                               for (i = 0; i < n; i++, ch++) {
+                                       *ch = ireq->iochunk.fo.io_times[i];
                                }
                        }
                }
+               FUSE_TRACE_COMMIT(fc->ktrace);
+       }
+       ireq->iochunk.msg.destructor = NULL;
+       ireq->iochunk.msg.rpc = NULL;
+       ireq_complete(ireq);
+}
+
+static void complete_fo_request(struct pcs_int_request * sreq)
+{
+       struct pcs_int_request * ireq = sreq->iochunk.parent_N;
+
+       if (pcs_if_error(&sreq->error)) {
+               if (!pcs_if_error(&ireq->error))
+                       ireq->error = sreq->error;
        }
 
+       /* And free all clone resources */
+       pcs_sreq_detach(sreq);
+       if (sreq->iochunk.map)
+               pcs_map_put(sreq->iochunk.map);
+       if (sreq->iochunk.csl)
+               cslist_put(sreq->iochunk.csl);
+       if (sreq->iochunk.flow)
+               pcs_flow_put(sreq->iochunk.flow, &sreq->cc->maps.ftab);
+       ireq_destroy(sreq);
+
+       ireq_complete_fo(ireq);
+}
+
+static void do_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
+{
+       struct pcs_msg *msg = &ireq->iochunk.msg;
+       struct pcs_cs_iohdr *ioh;
+       struct pcs_cs_list *csl = ireq->iochunk.csl;
+       struct pcs_map_entry *map = ireq->iochunk.map; /* ireq keeps reference 
to map */
+       int storage_version = atomic_read(&ireq->cc->storage_version);
+       int aligned_msg;
+
+       BUG_ON(msg->rpc);
+
        msg->private = cs;
        msg->private2 = ireq;
 
@@ -685,6 +764,8 @@ void pcs_cs_submit(struct pcs_cs *cs, struct 
pcs_int_request *ireq)
                 ioh->sync.misc |= PCS_CS_IO_NOCSUM;
        if ((ireq->dentry->fileinfo.attr.attrib & PCS_FATTR_IMMEDIATE_WRITE) || 
ireq->dentry->no_write_delay)
                ioh->sync.misc |= PCS_CS_IO_SYNC;
+       if (ireq->flags & IREQ_F_FANOUT)
+               ioh->sync.misc = PCS_CS_IO_FANOUT;
 
        msg->size = ioh->hdr.len;
        msg->rpc = NULL;
@@ -731,6 +812,87 @@ void pcs_cs_submit(struct pcs_cs *cs, struct 
pcs_int_request *ireq)
        pcs_rpc_queue(cs->rpc, msg);
 }
 
+static inline int eligible_for_fanout(struct pcs_int_request * ireq)
+{
+       return (cs_enable_fanout && 
pcs_cs_fanout(atomic_read(&ireq->cc->storage_version)) &&
+               ireq->iochunk.csl->nsrv <= PCS_MAP_MAX_FO_CS &&
+               ireq->iochunk.cs_index + 1 < ireq->iochunk.csl->nsrv &&
+               !(ireq->iochunk.csl->flags & CS_FL_REPLICATING));
+}
+
+void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
+{
+       ireq->ts_sent = ktime_get();
+
+       if (!((ireq->iochunk.size|ireq->iochunk.offset) & 511) && !(ireq->flags 
& IREQ_F_NO_ACCEL)) {
+               if (ireq->iochunk.cmd == PCS_REQ_T_READ) {
+                       if (pcs_csa_cs_submit(cs, ireq))
+                               return;
+               } else if (ireq->iochunk.cmd == PCS_REQ_T_WRITE) {
+                       /* Synchronous writes in accel mode are still not 
supported */
+                       if (!(ireq->dentry->fileinfo.attr.attrib & 
PCS_FATTR_IMMEDIATE_WRITE) &&
+                           !ireq->dentry->no_write_delay) {
+                               struct pcs_int_request * sreq;
+
+                               sreq = pcs_csa_csl_write_submit(ireq);
+                               if (!sreq)
+                                       return;
+                               if (sreq != ireq) {
+                                       ireq = sreq;
+                                       cs = 
ireq->iochunk.csl->cs[ireq->iochunk.cs_index].cslink.cs;
+                               }
+                       }
+               }
+       }
+
+       if (ireq->iochunk.cmd == PCS_REQ_T_WRITE && eligible_for_fanout(ireq)) {
+               int idx = ireq->iochunk.cs_index;
+               struct pcs_cs_list * csl = ireq->iochunk.csl;
+
+               atomic_set(&ireq->iochunk.fo.iocount, 1);
+               ireq->iochunk.fo.num_iotimes = csl->nsrv;
+
+               for (; idx < csl->nsrv; idx++) {
+                       struct pcs_int_request * sreq;
+
+                       sreq = pcs_ireq_split(ireq, 0, 1);
+                       if (sreq == NULL) {
+                               ireq->error.remote = 1;
+                               ireq->error.offender = 
ireq->iochunk.csl->cs[idx].info.id;
+                               ireq->error.value = PCS_ERR_NORES;
+                               ireq_complete_fo(ireq);
+                               return;
+                       }
+
+                       sreq->iochunk.size = ireq->iochunk.size;
+                       sreq->iochunk.csl = ireq->iochunk.csl;
+                       cslist_get(ireq->iochunk.csl);
+                       sreq->flags |= IREQ_F_NOACCT|IREQ_F_FANOUT;
+                       sreq->complete_cb = complete_fo_request;
+                       sreq->iochunk.parent_N = ireq;
+                       sreq->iochunk.cs_index = idx;
+                       atomic_inc(&ireq->iochunk.fo.iocount);
+
+                       /* If it is not the first cs on remaining chain, we can
+                        * try to check for eligibility for direct cs access.
+                        * This will handle the case when a local cs is not at 
head of
+                        * chain.
+                        */
+                       if (idx == ireq->iochunk.cs_index ||
+                           (ireq->dentry->fileinfo.attr.attrib & 
PCS_FATTR_IMMEDIATE_WRITE) ||
+                           ireq->dentry->no_write_delay ||
+                           ((ireq->iochunk.size|ireq->iochunk.offset) & 511) ||
+                           (ireq->flags & IREQ_F_NO_ACCEL) ||
+                           !pcs_csa_csl_write_submit_single(sreq, idx))
+                               
do_cs_submit(ireq->iochunk.csl->cs[idx].cslink.cs, sreq);
+               }
+               ireq_complete_fo(ireq);
+               return;
+       }
+
+       do_cs_submit(cs, ireq);
+}
+
 static void handle_congestion(struct pcs_cs *cs, struct pcs_rpc_hdr *h)
 {
        struct pcs_cs *who;
diff --git a/fs/fuse/kio/pcs/pcs_cs.h b/fs/fuse/kio/pcs/pcs_cs.h
index 0b17e924a..61be99a 100644
--- a/fs/fuse/kio/pcs/pcs_cs.h
+++ b/fs/fuse/kio/pcs/pcs_cs.h
@@ -215,6 +215,7 @@ static inline bool cs_is_blacklisted(struct pcs_cs *cs)
 
 int pcs_csa_cs_submit(struct pcs_cs * cs, struct pcs_int_request * ireq);
 struct pcs_int_request * pcs_csa_csl_write_submit(struct pcs_int_request * 
ireq);
+int pcs_csa_csl_write_submit_single(struct pcs_int_request * ireq, int idx);
 void pcs_csa_relay_iotimes(struct pcs_int_request * ireq,  struct pcs_cs_iohdr 
* h, PCS_NODE_ID_T cs_id);
 void pcs_csa_cs_detach(struct pcs_cs * cs);
 
diff --git a/fs/fuse/kio/pcs/pcs_cs_accel.c b/fs/fuse/kio/pcs/pcs_cs_accel.c
index ae8562e..3b75571 100644
--- a/fs/fuse/kio/pcs/pcs_cs_accel.c
+++ b/fs/fuse/kio/pcs/pcs_cs_accel.c
@@ -692,7 +692,6 @@ int pcs_csa_cs_submit(struct pcs_cs * cs, struct 
pcs_int_request * ireq)
 
 static void ireq_init_acr(struct pcs_int_request * ireq)
 {
-       ireq->iochunk.parent_N = NULL;
        atomic_set(&ireq->iochunk.acr.iocount, 1);
        ireq->iochunk.acr.num_awr = 0;
        ireq->iochunk.acr.num_iotimes = 0;
@@ -715,7 +714,6 @@ static void ireq_clear_acr(struct pcs_int_request * ireq)
        }
        ireq->iochunk.msg.destructor = NULL;
        ireq->iochunk.msg.rpc = NULL;
-       ireq->iochunk.parent_N = NULL;
        ireq->flags |= IREQ_F_NO_ACCEL;
 }
 
@@ -724,13 +722,19 @@ void pcs_csa_relay_iotimes(struct pcs_int_request * ireq, 
 struct pcs_cs_iohdr *
        int idx = ireq->iochunk.acr.num_awr;
        struct pcs_cs_sync_resp * srec;
 
-       ireq->iochunk.acr.io_times[idx].cs_id = cs_id;
-       ireq->iochunk.acr.io_times[idx].sync = h->sync;
+       ireq->iochunk.acr.io_times[idx].csid = cs_id.val;
+       ireq->iochunk.acr.io_times[idx].misc = h->sync.misc;
+       ireq->iochunk.acr.io_times[idx].ts_net = h->sync.ts_net;
+       ireq->iochunk.acr.io_times[idx].ts_io = h->sync.ts_io;
 
        for (srec = (struct pcs_cs_sync_resp*)(h + 1), idx++;
             (void*)(srec + 1) <= (void*)h + h->hdr.len && idx < 
PCS_MAX_ACCEL_CS;
-            srec++, idx++)
-               ireq->iochunk.acr.io_times[idx] = *srec;
+            srec++, idx++) {
+               ireq->iochunk.acr.io_times[idx].csid = srec->cs_id.val;
+               ireq->iochunk.acr.io_times[idx].misc = srec->sync.misc;
+               ireq->iochunk.acr.io_times[idx].ts_net = srec->sync.ts_net;
+               ireq->iochunk.acr.io_times[idx].ts_io = srec->sync.ts_io;
+       }
 
        ireq->iochunk.acr.num_iotimes = idx;
 }
@@ -746,6 +750,13 @@ static void __complete_acr_work(struct work_struct * w)
                      ireq, (unsigned long long)ireq->iochunk.chunk,
                      (unsigned)ireq->iochunk.offset,
                      (unsigned)ireq->iochunk.size);
+       } else if (ireq->iochunk.parent_N) {
+               struct pcs_int_request * parent = ireq->iochunk.parent_N;
+               int idx = ireq->iochunk.cs_index;
+
+               WARN_ON(!(ireq->flags & IREQ_F_FANOUT));
+               parent->iochunk.fo.io_times[idx] = 
ireq->iochunk.acr.io_times[idx];
+               parent->iochunk.fo.io_times[idx].misc |= PCS_CS_IO_SEQ;
        } else {
                struct fuse_conn * fc = container_of(ireq->cc, struct 
pcs_fuse_cluster, cc)->fc;
 
@@ -772,13 +783,8 @@ static void __complete_acr_work(struct work_struct * w)
                                th->type = PCS_CS_WRITE_AL_RESP;
                                th->cses = n;
 
-                               for (i = 0; i < n; i++) {
-                                       ch->csid = 
ireq->iochunk.acr.io_times[i].cs_id.val;
-                                       ch->misc = 
ireq->iochunk.acr.io_times[i].sync.misc;
-                                       ch->ts_net = 
ireq->iochunk.acr.io_times[i].sync.ts_net;
-                                       ch->ts_io = 
ireq->iochunk.acr.io_times[i].sync.ts_io;
-                                       ch++;
-                               }
+                               for (i = 0; i < n; i++, ch++)
+                                       *ch = ireq->iochunk.acr.io_times[i];
                        }
                }
                FUSE_TRACE_COMMIT(fc->ktrace);
@@ -807,10 +813,11 @@ static void __pcs_csa_write_final_completion(struct 
pcs_accel_write_req *areq)
        ireq = container_of(areq - areq->index, struct pcs_int_request, 
iochunk.acr.awr[0]);
 
        if (!pcs_if_error(&ireq->error)) {
-               struct pcs_cs_sync_resp * sresp = 
&ireq->iochunk.acr.io_times[areq->index];
-               sresp->cs_id.val = 
ireq->iochunk.csl->cs[areq->index].info.id.val | PCS_NODE_ALT_MASK;
-               sresp->sync.ts_net = 0;
-               sresp->sync.ts_io = ktime_to_us(ktime_get()) - sresp->sync.misc;
+               struct fuse_tr_iotimes_cs * th = 
&ireq->iochunk.acr.io_times[areq->index];
+               th->csid = ireq->iochunk.csl->cs[areq->index].info.id.val | 
PCS_NODE_ALT_MASK;
+               th->ts_net = 0;
+               th->ts_io = ktime_to_us(ktime_get()) - th->misc;
+               th->misc &= PCS_CS_TS_MASK;
        }
 
        csa_complete_acr(ireq);
@@ -992,7 +999,7 @@ static inline int csa_submit_write(struct file * file, 
struct pcs_int_request *
        areq->index = idx;
        ireq->iochunk.acr.num_awr = idx + 1;
 
-       ireq->iochunk.acr.io_times[idx].sync.misc = ktime_to_us(ktime_get());
+       ireq->iochunk.acr.io_times[idx].misc = ktime_to_us(ktime_get());
 
        ret = call_write_iter(file, iocb, it);
 
@@ -1090,7 +1097,6 @@ static void complete_N_request(struct pcs_int_request * 
sreq)
        csa_complete_acr(ireq);
 }
 
-
 struct pcs_int_request * pcs_csa_csl_write_submit(struct pcs_int_request * 
ireq)
 {
        int idx;
@@ -1149,6 +1155,23 @@ struct pcs_int_request * pcs_csa_csl_write_submit(struct 
pcs_int_request * ireq)
 }
 
 
+int pcs_csa_csl_write_submit_single(struct pcs_int_request * ireq, int idx)
+{
+       if (idx >= PCS_MAX_ACCEL_CS)
+               return 0;
+
+       ireq_init_acr(ireq);
+
+       if (!csa_cs_submit_write(ireq, idx)) {
+               ireq_clear_acr(ireq);
+               return 0;
+       }
+
+       ireq->iochunk.acr.num_iotimes = idx;
+       csa_complete_acr(ireq);
+       return 1;
+}
+
 static long csa_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
 {
        struct pcs_csa_context *ctx = file->private_data;
diff --git a/fs/fuse/kio/pcs/pcs_cs_prot.h b/fs/fuse/kio/pcs/pcs_cs_prot.h
index 6e0d371..c72066d 100644
--- a/fs/fuse/kio/pcs/pcs_cs_prot.h
+++ b/fs/fuse/kio/pcs/pcs_cs_prot.h
@@ -40,10 +40,13 @@ struct pcs_cs_sync_data
 #define PCS_CS_IO_NOCSUM       (1ULL<<61)      /* Req: skip crc verification */
 #define PCS_CS_IO_SYNC         (1ULL<<60)      /* Req: DSYNC request */
 #define PCS_CS_IO_BACKGROUND   (1ULL<<59)      /* Req: low priority request */
+#define PCS_CS_IO_FANOUT       (1ULL<<58)      /* Req: request must not be 
forwarded */
+#define PCS_CS_IO_CLEAR                (1ULL<<57)      /* Req/resp: indicator 
that write is done to stable chain */
 
-#define PCS_CS_RESET_TS_RECV(sdata, ts)        do { (sdata)->misc = ((u64)ts & 
0xFFFFFFFFFFFFFULL); } while (0)
-#define PCS_CS_SET_TS_RECV(sdata, ts)  do { (sdata)->misc = ((sdata)->misc & 
~0xFFFFFFFFFFFFFULL) | ((u64)ts & 0xFFFFFFFFFFFFFULL); } while (0)
-#define PCS_CS_ADD_TS_RECV(sdata, ts)  do { (sdata)->misc |= ((u64)ts & 
0xFFFFFFFFFFFFFULL); } while (0)
+#define PCS_CS_TS_MASK         0xFFFFFFFFFFFFFULL
+#define PCS_CS_RESET_TS_RECV(sdata, ts)        do { (sdata)->misc = ((u64)ts & 
PCS_CS_TS_MASK); } while (0)
+#define PCS_CS_SET_TS_RECV(sdata, ts)  do { (sdata)->misc = ((sdata)->misc & 
~PCS_CS_TS_MASK) | ((u64)ts & PCS_CS_TS_MASK); } while (0)
+#define PCS_CS_ADD_TS_RECV(sdata, ts)  do { (sdata)->misc |= ((u64)ts & 
PCS_CS_TS_MASK); } while (0)
 #define PCS_CS_GET_TS_RECV(sdata)      ((sdata)->misc & 0xFFFFFFFFFFFFFULL)
 
 struct pcs_cs_sync_resp {
@@ -84,6 +87,11 @@ static inline int pcs_cs_use_aligned_io(u32 storage_version)
        return (storage_version >= PCS_CS_MSG_ALIGNED_VERSION);
 }
 
+static inline int pcs_cs_fanout(u32 storage_version)
+{
+       return (storage_version >= PCS_CS_FANOUT);
+}
+
 /* Maximal message size. Actually, random */
 #define PCS_CS_MSG_MAX_SIZE    (1024*1024 + sizeof(struct pcs_cs_iohdr))
 
diff --git a/fs/fuse/kio/pcs/pcs_map.c b/fs/fuse/kio/pcs/pcs_map.c
index 634775b..29ed4fa 100644
--- a/fs/fuse/kio/pcs/pcs_map.c
+++ b/fs/fuse/kio/pcs/pcs_map.c
@@ -42,6 +42,10 @@
 module_param(cs_io_locality, uint, 0644);
 MODULE_PARM_DESC(cs_io_locality, "CS IO locality");
 
+unsigned int cs_enable_fanout = 0;
+module_param(cs_enable_fanout, uint, 0644);
+MODULE_PARM_DESC(cs_enable_fanout, "Enable CS fanout");
+
 static struct pcs_cs_list *cs_link_to_cs_list(struct pcs_cs_link *csl)
 {
        struct pcs_cs_record *cs_rec;
@@ -976,6 +980,7 @@ struct pcs_cs_list* cslist_alloc( struct pcs_cs_set *css, 
struct pcs_cs_info *re
 
                if (cs_list->cs[i].info.flags & CS_FL_REPLICATING) {
                        __set_bit(i, &cs_list->blacklist);
+                       cs_list->flags |= CS_FL_REPLICATING;
                        cs_list->blacklist_expires = jiffies + 
PCS_REPLICATION_BLACKLIST_TIMEOUT;
                }
 
diff --git a/fs/fuse/kio/pcs/pcs_map.h b/fs/fuse/kio/pcs/pcs_map.h
index bfe0719..8cc7dfe 100644
--- a/fs/fuse/kio/pcs/pcs_map.h
+++ b/fs/fuse/kio/pcs/pcs_map.h
@@ -220,6 +220,7 @@ static inline struct pcs_cluster_core *cc_from_map(struct 
pcs_map_entry * m)
 void ireq_drop_tokens(struct pcs_int_request * ireq);
 
 extern unsigned int cs_io_locality;
+extern unsigned int cs_enable_fanout;
 
 void cslist_destroy(struct pcs_cs_list * csl);
 
diff --git a/fs/fuse/kio/pcs/pcs_prot_types.h b/fs/fuse/kio/pcs/pcs_prot_types.h
index d1ed548..def0073 100644
--- a/fs/fuse/kio/pcs/pcs_prot_types.h
+++ b/fs/fuse/kio/pcs/pcs_prot_types.h
@@ -23,6 +23,7 @@
 
 #define PCS_VZ7_VERSION 100
 #define PCS_CS_MSG_ALIGNED_VERSION 134
+#define PCS_CS_FANOUT 177
 
 /* milliseconds since Jan 1970 */
 typedef u64 PCS_FILETIME_T;
diff --git a/fs/fuse/kio/pcs/pcs_req.h b/fs/fuse/kio/pcs/pcs_req.h
index 8ee32b3..c677332 100644
--- a/fs/fuse/kio/pcs/pcs_req.h
+++ b/fs/fuse/kio/pcs/pcs_req.h
@@ -15,6 +15,7 @@
 #include "pcs_cs_prot.h"
 #include "pcs_rpc.h"
 #include "pcs_cs.h"
+#include "fuse_ktrace_prot.h"
 #include "fuse_stat.h"
 #include "../../fuse_i.h"
 
@@ -60,6 +61,7 @@ struct pcs_aio_req
 };
 
 #define PCS_MAX_ACCEL_CS       3
+#define PCS_MAP_MAX_FO_CS      8
 
 struct pcs_accel_write_req
 {
@@ -75,12 +77,11 @@ struct pcs_accel_write_req
 
 struct pcs_accel_req
 {
-       struct pcs_int_request          *parent;
        atomic_t                        iocount;
        int                             num_awr;
        struct pcs_accel_write_req      awr[PCS_MAX_ACCEL_CS];
        int                             num_iotimes;
-       struct pcs_cs_sync_resp         io_times[PCS_MAX_ACCEL_CS];
+       struct fuse_tr_iotimes_cs       io_times[PCS_MAX_ACCEL_CS];
        struct work_struct              work;
 };
 
@@ -93,6 +94,13 @@ struct pcs_iochunk_req {
        struct pcs_int_request  *parent_N;
 };
 
+struct pcs_fo_req
+{
+       atomic_t                        iocount;
+       int                             num_iotimes;
+       struct fuse_tr_iotimes_cs       io_times[PCS_MAP_MAX_FO_CS];
+};
+
 struct pcs_int_request
 {
        struct pcs_cluster_core* cc;
@@ -119,6 +127,7 @@ struct pcs_int_request
 #define IREQ_F_CRYPT           0x2000
 #define IREQ_F_ACCELERROR      0x4000
 #define IREQ_F_NOACCT          0x8000
+#define IREQ_F_FANOUT         0x10000
 
        atomic_t                iocount;
 
@@ -179,6 +188,7 @@ struct pcs_int_request
                                        struct pcs_int_request  *parent_N;
                                };
                                struct pcs_iochunk_req          ir;
+                               struct pcs_fo_req               fo;
                                struct pcs_aio_req              ar;
                                struct pcs_accel_req            acr;
                        };
-- 
1.8.3.1

_______________________________________________
Devel mailing list
Devel@openvz.org
https://lists.openvz.org/mailman/listinfo/devel

Reply via email to