Steve

I have not checked the code because I am away from my laptop
But I assume mlx and mthca is using fmr and cxgb4 is using 
rdma-read-with-invalidate which has implicit fence.
 
If invalidate is issued before read is completed its a problem.

Regards
Devesh
____________________________________
From: Steve Wise [sw...@opengridcomputing.com]
Sent: Friday, May 30, 2014 6:32 PM
To: Devesh Sharma; bfie...@fieldses.org
Cc: linux-...@vger.kernel.org; linux-rdma@vger.kernel.org; 
t...@opengridcomputing.com
Subject: RE: [PATCH V3] svcrdma: refactor marshalling logic

>
> Hi Steve
>
> I am testing this patch. I have found that when server tries to initiate 
> RDMA-READ on ocrdma
> device the RDMA-READ posting fails because there is no FENCE bit set for
> Non-iwarp device which is using frmr. Because of this, whenever server tries 
> to initiate
> RDMA_READ operation, it fails with completion error.
> This bug was there in v1 and v2 as well.
>

Why would the FENCE bit not be required for mlx4, mthca, cxgb4, and yet be 
required for ocrdma?


> Check inline for the exact location of the change.
>
> Rest is okay from my side, iozone is passing with this patch. Off-course 
> after putting a FENCE
> indicator.
>
> -Regards
>  Devesh
>
> > -----Original Message-----
> > From: linux-rdma-ow...@vger.kernel.org [mailto:linux-rdma-
> > ow...@vger.kernel.org] On Behalf Of Steve Wise
> > Sent: Thursday, May 29, 2014 10:26 PM
> > To: bfie...@fieldses.org
> > Cc: linux-...@vger.kernel.org; linux-rdma@vger.kernel.org;
> > t...@opengridcomputing.com
> > Subject: [PATCH V3] svcrdma: refactor marshalling logic
> >
> > This patch refactors the NFSRDMA server marshalling logic to remove the
> > intermediary map structures.  It also fixes an existing bug where the
> > NFSRDMA server was not minding the device fast register page list length
> > limitations.
> >
> > I've also made a git repo available with these patches on top of 3.15-rc7:
> >
> > git://git.linux-nfs.org/projects/swise/linux.git svcrdma-refactor-v3
> >
> > Changes since V2:
> >
> > - fixed logic bug in rdma_read_chunk_frmr() and rdma_read_chunk_lcl()
> >
> > - in rdma_read_chunks(), set the reader function pointer only once since
> >   it doesn't change
> >
> > - squashed the patch back into one patch since the previous split wasn't
> >   bisectable
> >
> > Changes since V1:
> >
> > - fixed regression for devices that don't support FRMRs (see
> >   rdma_read_chunk_lcl())
> >
> > - split patch up for closer review.  However I request it be squashed
> >   before merging as they is not bisectable, and I think these changes
> >   should all be a single commit anyway.
> >
> > Please review, and test if you can.  I'd like this to hit 3.16.
> >
> > Signed-off-by: Tom Tucker <t...@opengridcomputing.com>
> > Signed-off-by: Steve Wise <sw...@opengridcomputing.com>
> > ---
> >
> >  include/linux/sunrpc/svc_rdma.h          |    3
> >  net/sunrpc/xprtrdma/svc_rdma_recvfrom.c  |  643 +++++++++++++----------
> > -------
> >  net/sunrpc/xprtrdma/svc_rdma_sendto.c    |  230 +----------
> >  net/sunrpc/xprtrdma/svc_rdma_transport.c |   62 ++-
> >  4 files changed, 332 insertions(+), 606 deletions(-)
> >
> > diff --git a/include/linux/sunrpc/svc_rdma.h
> > b/include/linux/sunrpc/svc_rdma.h index 0b8e3e6..5cf99a0 100644
> > --- a/include/linux/sunrpc/svc_rdma.h
> > +++ b/include/linux/sunrpc/svc_rdma.h
> > @@ -115,14 +115,13 @@ struct svc_rdma_fastreg_mr {
> >     struct list_head frmr_list;
> >  };
> >  struct svc_rdma_req_map {
> > -   struct svc_rdma_fastreg_mr *frmr;
> >     unsigned long count;
> >     union {
> >             struct kvec sge[RPCSVC_MAXPAGES];
> >             struct svc_rdma_chunk_sge ch[RPCSVC_MAXPAGES];
> > +           unsigned long lkey[RPCSVC_MAXPAGES];
> >     };
> >  };
> > -#define RDMACTXT_F_FAST_UNREG      1
> >  #define RDMACTXT_F_LAST_CTXT       2
> >
> >  #define    SVCRDMA_DEVCAP_FAST_REG         1       /*
> > fast mr registration */
> > diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> > b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> > index 8d904e4..52d9f2c 100644
> > --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> > +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> > @@ -1,4 +1,5 @@
> >  /*
> > + * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
> >   * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
> >   *
> >   * This software is available to you under a choice of one of two @@ -69,7
> > +70,8 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
> >
> >     /* Set up the XDR head */
> >     rqstp->rq_arg.head[0].iov_base = page_address(page);
> > -   rqstp->rq_arg.head[0].iov_len = min(byte_count, ctxt-
> > >sge[0].length);
> > +   rqstp->rq_arg.head[0].iov_len =
> > +           min_t(size_t, byte_count, ctxt->sge[0].length);
> >     rqstp->rq_arg.len = byte_count;
> >     rqstp->rq_arg.buflen = byte_count;
> >
> > @@ -85,7 +87,7 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
> >             page = ctxt->pages[sge_no];
> >             put_page(rqstp->rq_pages[sge_no]);
> >             rqstp->rq_pages[sge_no] = page;
> > -           bc -= min(bc, ctxt->sge[sge_no].length);
> > +           bc -= min_t(u32, bc, ctxt->sge[sge_no].length);
> >             rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
> >             sge_no++;
> >     }
> > @@ -113,291 +115,265 @@ static void rdma_build_arg_xdr(struct svc_rqst
> > *rqstp,
> >     rqstp->rq_arg.tail[0].iov_len = 0;
> >  }
> >
> > -/* Encode a read-chunk-list as an array of IB SGE
> > - *
> > - * Assumptions:
> > - * - chunk[0]->position points to pages[0] at an offset of 0
> > - * - pages[] is not physically or virtually contiguous and consists of
> > - *   PAGE_SIZE elements.
> > - *
> > - * Output:
> > - * - sge array pointing into pages[] array.
> > - * - chunk_sge array specifying sge index and count for each
> > - *   chunk in the read list
> > - *
> > - */
> > -static int map_read_chunks(struct svcxprt_rdma *xprt,
> > -                      struct svc_rqst *rqstp,
> > -                      struct svc_rdma_op_ctxt *head,
> > -                      struct rpcrdma_msg *rmsgp,
> > -                      struct svc_rdma_req_map *rpl_map,
> > -                      struct svc_rdma_req_map *chl_map,
> > -                      int ch_count,
> > -                      int byte_count)
> > +static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
> >  {
> > -   int sge_no;
> > -   int sge_bytes;
> > -   int page_off;
> > -   int page_no;
> > -   int ch_bytes;
> > -   int ch_no;
> > -   struct rpcrdma_read_chunk *ch;
> > +   if (rdma_node_get_transport(xprt->sc_cm_id->device->node_type)
> > ==
> > +        RDMA_TRANSPORT_IWARP)
> > +           return 1;
> > +   else
> > +           return min_t(int, sge_count, xprt->sc_max_sge); }
> >
> > -   sge_no = 0;
> > -   page_no = 0;
> > -   page_off = 0;
> > -   ch = (struct rpcrdma_read_chunk *)&rmsgp-
> > >rm_body.rm_chunks[0];
> > -   ch_no = 0;
> > -   ch_bytes = ntohl(ch->rc_target.rs_length);
> > -   head->arg.head[0] = rqstp->rq_arg.head[0];
> > -   head->arg.tail[0] = rqstp->rq_arg.tail[0];
> > -   head->arg.pages = &head->pages[head->count];
> > -   head->hdr_count = head->count; /* save count of hdr pages */
> > -   head->arg.page_base = 0;
> > -   head->arg.page_len = ch_bytes;
> > -   head->arg.len = rqstp->rq_arg.len + ch_bytes;
> > -   head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes;
> > -   head->count++;
> > -   chl_map->ch[0].start = 0;
> > -   while (byte_count) {
> > -           rpl_map->sge[sge_no].iov_base =
> > -                   page_address(rqstp->rq_arg.pages[page_no]) +
> > page_off;
> > -           sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes);
> > -           rpl_map->sge[sge_no].iov_len = sge_bytes;
> > -           /*
> > -            * Don't bump head->count here because the same page
> > -            * may be used by multiple SGE.
> > -            */
> > -           head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
> > -           rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];
> > +typedef int (*rdma_reader_fn)(struct svcxprt_rdma *xprt,
> > +                         struct svc_rqst *rqstp,
> > +                         struct svc_rdma_op_ctxt *head,
> > +                         int *page_no,
> > +                         u32 *page_offset,
> > +                         u32 rs_handle,
> > +                         u32 rs_length,
> > +                         u64 rs_offset,
> > +                         int last);
> > +
> > +/* Issue an RDMA_READ using the local lkey to map the data sink */
> > +static int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
> > +                          struct svc_rqst *rqstp,
> > +                          struct svc_rdma_op_ctxt *head,
> > +                          int *page_no,
> > +                          u32 *page_offset,
> > +                          u32 rs_handle,
> > +                          u32 rs_length,
> > +                          u64 rs_offset,
> > +                          int last)
> > +{
> > +   struct ib_send_wr read_wr;
> > +   int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >>
> > PAGE_SHIFT;
> > +   struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
> > +   int ret, read, pno;
> > +   u32 pg_off = *page_offset;
> > +   u32 pg_no = *page_no;
> > +
> > +   ctxt->direction = DMA_FROM_DEVICE;
> > +   ctxt->read_hdr = head;
> > +   pages_needed =
> > +           min_t(int, pages_needed, rdma_read_max_sge(xprt,
> > pages_needed));
> > +   read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);
> > +
> > +   for (pno = 0; pno < pages_needed; pno++) {
> > +           int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
> > +
> > +           head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
> > +           head->arg.page_len += len;
> > +           head->arg.len += len;
> > +           if (!pg_off)
> > +                   head->count++;
> > +           rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
> >             rqstp->rq_next_page = rqstp->rq_respages + 1;
> > +           ctxt->sge[pno].addr =
> > +                   ib_dma_map_page(xprt->sc_cm_id->device,
> > +                                   head->arg.pages[pg_no], pg_off,
> > +                                   PAGE_SIZE - pg_off,
> > +                                   DMA_FROM_DEVICE);
> > +           ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
> > +                                      ctxt->sge[pno].addr);
> > +           if (ret)
> > +                   goto err;
> > +           atomic_inc(&xprt->sc_dma_used);
> >
> > -           byte_count -= sge_bytes;
> > -           ch_bytes -= sge_bytes;
> > -           sge_no++;
> > -           /*
> > -            * If all bytes for this chunk have been mapped to an
> > -            * SGE, move to the next SGE
> > -            */
> > -           if (ch_bytes == 0) {
> > -                   chl_map->ch[ch_no].count =
> > -                           sge_no - chl_map->ch[ch_no].start;
> > -                   ch_no++;
> > -                   ch++;
> > -                   chl_map->ch[ch_no].start = sge_no;
> > -                   ch_bytes = ntohl(ch->rc_target.rs_length);
> > -                   /* If bytes remaining account for next chunk */
> > -                   if (byte_count) {
> > -                           head->arg.page_len += ch_bytes;
> > -                           head->arg.len += ch_bytes;
> > -                           head->arg.buflen += ch_bytes;
> > -                   }
> > +           /* The lkey here is either a local dma lkey or a dma_mr lkey
> > */
> > +           ctxt->sge[pno].lkey = xprt->sc_dma_lkey;
> > +           ctxt->sge[pno].length = len;
> > +           ctxt->count++;
> > +
> > +           /* adjust offset and wrap to next page if needed */
> > +           pg_off += len;
> > +           if (pg_off == PAGE_SIZE) {
> > +                   pg_off = 0;
> > +                   pg_no++;
> >             }
> > -           /*
> > -            * If this SGE consumed all of the page, move to the
> > -            * next page
> > -            */
> > -           if ((sge_bytes + page_off) == PAGE_SIZE) {
> > -                   page_no++;
> > -                   page_off = 0;
> > -                   /*
> > -                    * If there are still bytes left to map, bump
> > -                    * the page count
> > -                    */
> > -                   if (byte_count)
> > -                           head->count++;
> > -           } else
> > -                   page_off += sge_bytes;
> > +           rs_length -= len;
> >     }
> > -   BUG_ON(byte_count != 0);
> > -   return sge_no;
> > +
> > +   if (last && rs_length == 0)
> > +           set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
> > +   else
> > +           clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
> > +
> > +   memset(&read_wr, 0, sizeof(read_wr));
> > +   read_wr.wr_id = (unsigned long)ctxt;
> > +   read_wr.opcode = IB_WR_RDMA_READ;
> > +   ctxt->wr_op = read_wr.opcode;
> > +   read_wr.send_flags = IB_SEND_SIGNALED;
> > +   read_wr.wr.rdma.rkey = rs_handle;
> > +   read_wr.wr.rdma.remote_addr = rs_offset;
> > +   read_wr.sg_list = ctxt->sge;
> > +   read_wr.num_sge = pages_needed;
> > +
> > +   ret = svc_rdma_send(xprt, &read_wr);
> > +   if (ret) {
> > +           pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
> > +           set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
> > +           goto err;
> > +   }
> > +
> > +   /* return current location in page array */
> > +   *page_no = pg_no;
> > +   *page_offset = pg_off;
> > +   ret = read;
> > +   atomic_inc(&rdma_stat_read);
> > +   return ret;
> > + err:
> > +   svc_rdma_unmap_dma(ctxt);
> > +   svc_rdma_put_context(ctxt, 0);
> > +   return ret;
> >  }
> >
> > -/* Map a read-chunk-list to an XDR and fast register the page-list.
> > - *
> > - * Assumptions:
> > - * - chunk[0]      position points to pages[0] at an offset of 0
> > - * - pages[]       will be made physically contiguous by creating a one-off
> > memory
> > - *         region using the fastreg verb.
> > - * - byte_count is # of bytes in read-chunk-list
> > - * - ch_count      is # of chunks in read-chunk-list
> > - *
> > - * Output:
> > - * - sge array pointing into pages[] array.
> > - * - chunk_sge array specifying sge index and count for each
> > - *   chunk in the read list
> > - */
> > -static int fast_reg_read_chunks(struct svcxprt_rdma *xprt,
> > +/* Issue an RDMA_READ using an FRMR to map the data sink */ static int
> > +rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
> >                             struct svc_rqst *rqstp,
> >                             struct svc_rdma_op_ctxt *head,
> > -                           struct rpcrdma_msg *rmsgp,
> > -                           struct svc_rdma_req_map *rpl_map,
> > -                           struct svc_rdma_req_map *chl_map,
> > -                           int ch_count,
> > -                           int byte_count)
> > +                           int *page_no,
> > +                           u32 *page_offset,
> > +                           u32 rs_handle,
> > +                           u32 rs_length,
> > +                           u64 rs_offset,
> > +                           int last)
> >  {
> > -   int page_no;
> > -   int ch_no;
> > -   u32 offset;
> > -   struct rpcrdma_read_chunk *ch;
> > -   struct svc_rdma_fastreg_mr *frmr;
> > -   int ret = 0;
> > +   struct ib_send_wr read_wr;
> > +   struct ib_send_wr inv_wr;
> > +   struct ib_send_wr fastreg_wr;
> > +   u8 key;
> > +   int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >>
> > PAGE_SHIFT;
> > +   struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
> > +   struct svc_rdma_fastreg_mr *frmr = svc_rdma_get_frmr(xprt);
> > +   int ret, read, pno;
> > +   u32 pg_off = *page_offset;
> > +   u32 pg_no = *page_no;
> >
> > -   frmr = svc_rdma_get_frmr(xprt);
> >     if (IS_ERR(frmr))
> >             return -ENOMEM;
> >
> > -   head->frmr = frmr;
> > -   head->arg.head[0] = rqstp->rq_arg.head[0];
> > -   head->arg.tail[0] = rqstp->rq_arg.tail[0];
> > -   head->arg.pages = &head->pages[head->count];
> > -   head->hdr_count = head->count; /* save count of hdr pages */
> > -   head->arg.page_base = 0;
> > -   head->arg.page_len = byte_count;
> > -   head->arg.len = rqstp->rq_arg.len + byte_count;
> > -   head->arg.buflen = rqstp->rq_arg.buflen + byte_count;
> > +   ctxt->direction = DMA_FROM_DEVICE;
> > +   ctxt->frmr = frmr;
> > +   pages_needed = min_t(int, pages_needed, xprt-
> > >sc_frmr_pg_list_len);
> > +   read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);
> >
> > -   /* Fast register the page list */
> > -   frmr->kva = page_address(rqstp->rq_arg.pages[0]);
> > +   frmr->kva = page_address(rqstp->rq_arg.pages[pg_no]);
> >     frmr->direction = DMA_FROM_DEVICE;
> >     frmr->access_flags =
> > (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE);
> > -   frmr->map_len = byte_count;
> > -   frmr->page_list_len = PAGE_ALIGN(byte_count) >> PAGE_SHIFT;
> > -   for (page_no = 0; page_no < frmr->page_list_len; page_no++) {
> > -           frmr->page_list->page_list[page_no] =
> > +   frmr->map_len = pages_needed << PAGE_SHIFT;
> > +   frmr->page_list_len = pages_needed;
> > +
> > +   for (pno = 0; pno < pages_needed; pno++) {
> > +           int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
> > +
> > +           head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
> > +           head->arg.page_len += len;
> > +           head->arg.len += len;
> > +           if (!pg_off)
> > +                   head->count++;
> > +           rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
> > +           rqstp->rq_next_page = rqstp->rq_respages + 1;
> > +           frmr->page_list->page_list[pno] =
> >                     ib_dma_map_page(xprt->sc_cm_id->device,
> > -                                   rqstp->rq_arg.pages[page_no], 0,
> > +                                   head->arg.pages[pg_no], 0,
> >                                     PAGE_SIZE, DMA_FROM_DEVICE);
> > -           if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > -                                    frmr->page_list-
> > >page_list[page_no]))
> > -                   goto fatal_err;
> > +           ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
> > +                                      frmr->page_list->page_list[pno]);
> > +           if (ret)
> > +                   goto err;
> >             atomic_inc(&xprt->sc_dma_used);
> > -           head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
> > -   }
> > -   head->count += page_no;
> > -
> > -   /* rq_respages points one past arg pages */
> > -   rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
> > -   rqstp->rq_next_page = rqstp->rq_respages + 1;
> >
> > -   /* Create the reply and chunk maps */
> > -   offset = 0;
> > -   ch = (struct rpcrdma_read_chunk *)&rmsgp-
> > >rm_body.rm_chunks[0];
> > -   for (ch_no = 0; ch_no < ch_count; ch_no++) {
> > -           int len = ntohl(ch->rc_target.rs_length);
> > -           rpl_map->sge[ch_no].iov_base = frmr->kva + offset;
> > -           rpl_map->sge[ch_no].iov_len = len;
> > -           chl_map->ch[ch_no].count = 1;
> > -           chl_map->ch[ch_no].start = ch_no;
> > -           offset += len;
> > -           ch++;
> > +           /* adjust offset and wrap to next page if needed */
> > +           pg_off += len;
> > +           if (pg_off == PAGE_SIZE) {
> > +                   pg_off = 0;
> > +                   pg_no++;
> > +           }
> > +           rs_length -= len;
> >     }
> >
> > -   ret = svc_rdma_fastreg(xprt, frmr);
> > -   if (ret)
> > -           goto fatal_err;
> > -
> > -   return ch_no;
> > -
> > - fatal_err:
> > -   printk("svcrdma: error fast registering xdr for xprt %p", xprt);
> > -   svc_rdma_put_frmr(xprt, frmr);
> > -   return -EIO;
> > -}
> > -
> > -static int rdma_set_ctxt_sge(struct svcxprt_rdma *xprt,
> > -                        struct svc_rdma_op_ctxt *ctxt,
> > -                        struct svc_rdma_fastreg_mr *frmr,
> > -                        struct kvec *vec,
> > -                        u64 *sgl_offset,
> > -                        int count)
> > -{
> > -   int i;
> > -   unsigned long off;
> > +   if (last && rs_length == 0)
> > +           set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
> > +   else
> > +           clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
> >
> > -   ctxt->count = count;
> > -   ctxt->direction = DMA_FROM_DEVICE;
> > -   for (i = 0; i < count; i++) {
> > -           ctxt->sge[i].length = 0; /* in case map fails */
> > -           if (!frmr) {
> > -                   BUG_ON(!virt_to_page(vec[i].iov_base));
> > -                   off = (unsigned long)vec[i].iov_base &
> > ~PAGE_MASK;
> > -                   ctxt->sge[i].addr =
> > -                           ib_dma_map_page(xprt->sc_cm_id->device,
> > -
> >     virt_to_page(vec[i].iov_base),
> > -                                           off,
> > -                                           vec[i].iov_len,
> > -                                           DMA_FROM_DEVICE);
> > -                   if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > -                                            ctxt->sge[i].addr))
> > -                           return -EINVAL;
> > -                   ctxt->sge[i].lkey = xprt->sc_dma_lkey;
> > -                   atomic_inc(&xprt->sc_dma_used);
> > -           } else {
> > -                   ctxt->sge[i].addr = (unsigned long)vec[i].iov_base;
> > -                   ctxt->sge[i].lkey = frmr->mr->lkey;
> > -           }
> > -           ctxt->sge[i].length = vec[i].iov_len;
> > -           *sgl_offset = *sgl_offset + vec[i].iov_len;
> > +   /* Bump the key */
> > +   key = (u8)(frmr->mr->lkey & 0x000000FF);
> > +   ib_update_fast_reg_key(frmr->mr, ++key);
> > +
> > +   ctxt->sge[0].addr = (unsigned long)frmr->kva + *page_offset;
> > +   ctxt->sge[0].lkey = frmr->mr->lkey;
> > +   ctxt->sge[0].length = read;
> > +   ctxt->count = 1;
> > +   ctxt->read_hdr = head;
> > +
> > +   /* Prepare FASTREG WR */
> > +   memset(&fastreg_wr, 0, sizeof(fastreg_wr));
> > +   fastreg_wr.opcode = IB_WR_FAST_REG_MR;
> > +   fastreg_wr.send_flags = IB_SEND_SIGNALED;
> > +   fastreg_wr.wr.fast_reg.iova_start = (unsigned long)frmr->kva;
> > +   fastreg_wr.wr.fast_reg.page_list = frmr->page_list;
> > +   fastreg_wr.wr.fast_reg.page_list_len = frmr->page_list_len;
> > +   fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
> > +   fastreg_wr.wr.fast_reg.length = frmr->map_len;
> > +   fastreg_wr.wr.fast_reg.access_flags = frmr->access_flags;
> > +   fastreg_wr.wr.fast_reg.rkey = frmr->mr->lkey;
> > +   fastreg_wr.next = &read_wr;
> > +
> > +   /* Prepare RDMA_READ */
> > +   memset(&read_wr, 0, sizeof(read_wr));
> > +   read_wr.send_flags = IB_SEND_SIGNALED;
> > +   read_wr.wr.rdma.rkey = rs_handle;
> > +   read_wr.wr.rdma.remote_addr = rs_offset;
> > +   read_wr.sg_list = ctxt->sge;
> > +   read_wr.num_sge = 1;
> > +   if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
> > +           read_wr.opcode = IB_WR_RDMA_READ_WITH_INV;
> > +           read_wr.wr_id = (unsigned long)ctxt;
> > +           read_wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
> > +   } else {
> > +           read_wr.opcode = IB_WR_RDMA_READ;
> > +           read_wr.next = &inv_wr;
> > +           /* Prepare invalidate */
> > +           memset(&inv_wr, 0, sizeof(inv_wr));
> > +           inv_wr.wr_id = (unsigned long)ctxt;
> > +           inv_wr.opcode = IB_WR_LOCAL_INV;
> > +           inv_wr.send_flags = IB_SEND_SIGNALED;
>
> Change this to inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE;
>
> > +           inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
> > +   }
> > +   ctxt->wr_op = read_wr.opcode;
> > +
> > +   /* Post the chain */
> > +   ret = svc_rdma_send(xprt, &fastreg_wr);
> > +   if (ret) {
> > +           pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
> > +           set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
> > +           goto err;
> >     }
> > -   return 0;
> > -}
> >
> > -static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count) -{
> > -   if ((rdma_node_get_transport(xprt->sc_cm_id->device-
> > >node_type) ==
> > -        RDMA_TRANSPORT_IWARP) &&
> > -       sge_count > 1)
> > -           return 1;
> > -   else
> > -           return min_t(int, sge_count, xprt->sc_max_sge);
> > +   /* return current location in page array */
> > +   *page_no = pg_no;
> > +   *page_offset = pg_off;
> > +   ret = read;
> > +   atomic_inc(&rdma_stat_read);
> > +   return ret;
> > + err:
> > +   svc_rdma_unmap_dma(ctxt);
> > +   svc_rdma_put_context(ctxt, 0);
> > +   svc_rdma_put_frmr(xprt, frmr);
> > +   return ret;
> >  }
> >
> > -/*
> > - * Use RDMA_READ to read data from the advertised client buffer into the
> > - * XDR stream starting at rq_arg.head[0].iov_base.
> > - * Each chunk in the array
> > - * contains the following fields:
> > - * discrim      - '1', This isn't used for data placement
> > - * position     - The xdr stream offset (the same for every chunk)
> > - * handle       - RMR for client memory region
> > - * length       - data transfer length
> > - * offset       - 64 bit tagged offset in remote memory region
> > - *
> > - * On our side, we need to read into a pagelist. The first page immediately
> > - * follows the RPC header.
> > - *
> > - * This function returns:
> > - * 0 - No error and no read-list found.
> > - *
> > - * 1 - Successful read-list processing. The data is not yet in
> > - * the pagelist and therefore the RPC request must be deferred. The
> > - * I/O completion will enqueue the transport again and
> > - * svc_rdma_recvfrom will complete the request.
> > - *
> > - * <0 - Error processing/posting read-list.
> > - *
> > - * NOTE: The ctxt must not be touched after the last WR has been posted
> > - * because the I/O completion processing may occur on another
> > - * processor and free / modify the context. Ne touche pas!
> > - */
> > -static int rdma_read_xdr(struct svcxprt_rdma *xprt,
> > -                    struct rpcrdma_msg *rmsgp,
> > -                    struct svc_rqst *rqstp,
> > -                    struct svc_rdma_op_ctxt *hdr_ctxt)
> > +static int rdma_read_chunks(struct svcxprt_rdma *xprt,
> > +                       struct rpcrdma_msg *rmsgp,
> > +                       struct svc_rqst *rqstp,
> > +                       struct svc_rdma_op_ctxt *head)
> >  {
> > -   struct ib_send_wr read_wr;
> > -   struct ib_send_wr inv_wr;
> > -   int err = 0;
> > -   int ch_no;
> > -   int ch_count;
> > -   int byte_count;
> > -   int sge_count;
> > -   u64 sgl_offset;
> > +   int page_no, ch_count, ret;
> >     struct rpcrdma_read_chunk *ch;
> > -   struct svc_rdma_op_ctxt *ctxt = NULL;
> > -   struct svc_rdma_req_map *rpl_map;
> > -   struct svc_rdma_req_map *chl_map;
> > +   u32 page_offset, byte_count;
> > +   u64 rs_offset;
> > +   rdma_reader_fn reader;
> >
> >     /* If no read list is present, return 0 */
> >     ch = svc_rdma_get_read_chunk(rmsgp);
> > @@ -408,122 +384,55 @@ static int rdma_read_xdr(struct svcxprt_rdma
> > *xprt,
> >     if (ch_count > RPCSVC_MAXPAGES)
> >             return -EINVAL;
> >
> > -   /* Allocate temporary reply and chunk maps */
> > -   rpl_map = svc_rdma_get_req_map();
> > -   chl_map = svc_rdma_get_req_map();
> > +   /* The request is completed when the RDMA_READs complete. The
> > +    * head context keeps all the pages that comprise the
> > +    * request.
> > +    */
> > +   head->arg.head[0] = rqstp->rq_arg.head[0];
> > +   head->arg.tail[0] = rqstp->rq_arg.tail[0];
> > +   head->arg.pages = &head->pages[head->count];
> > +   head->hdr_count = head->count;
> > +   head->arg.page_base = 0;
> > +   head->arg.page_len = 0;
> > +   head->arg.len = rqstp->rq_arg.len;
> > +   head->arg.buflen = rqstp->rq_arg.buflen;
> >
> > -   if (!xprt->sc_frmr_pg_list_len)
> > -           sge_count = map_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp,
> > -                                       rpl_map, chl_map, ch_count,
> > -                                       byte_count);
> > +   /* Use FRMR if supported */
> > +   if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG)
> > +           reader = rdma_read_chunk_frmr;
> >     else
> > -           sge_count = fast_reg_read_chunks(xprt, rqstp, hdr_ctxt,
> > rmsgp,
> > -                                            rpl_map, chl_map, ch_count,
> > -                                            byte_count);
> > -   if (sge_count < 0) {
> > -           err = -EIO;
> > -           goto out;
> > -   }
> > -
> > -   sgl_offset = 0;
> > -   ch_no = 0;
> > +           reader = rdma_read_chunk_lcl;
> >
> > +   page_no = 0; page_offset = 0;
> >     for (ch = (struct rpcrdma_read_chunk *)&rmsgp-
> > >rm_body.rm_chunks[0];
> > -        ch->rc_discrim != 0; ch++, ch_no++) {
> > -           u64 rs_offset;
> > -next_sge:
> > -           ctxt = svc_rdma_get_context(xprt);
> > -           ctxt->direction = DMA_FROM_DEVICE;
> > -           ctxt->frmr = hdr_ctxt->frmr;
> > -           ctxt->read_hdr = NULL;
> > -           clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
> > -           clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
> > +        ch->rc_discrim != 0; ch++) {
> >
> > -           /* Prepare READ WR */
> > -           memset(&read_wr, 0, sizeof read_wr);
> > -           read_wr.wr_id = (unsigned long)ctxt;
> > -           read_wr.opcode = IB_WR_RDMA_READ;
> > -           ctxt->wr_op = read_wr.opcode;
> > -           read_wr.send_flags = IB_SEND_SIGNALED;
> > -           read_wr.wr.rdma.rkey = ntohl(ch->rc_target.rs_handle);
> >             xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset,
> >                              &rs_offset);
> > -           read_wr.wr.rdma.remote_addr = rs_offset + sgl_offset;
> > -           read_wr.sg_list = ctxt->sge;
> > -           read_wr.num_sge =
> > -                   rdma_read_max_sge(xprt, chl_map-
> > >ch[ch_no].count);
> > -           err = rdma_set_ctxt_sge(xprt, ctxt, hdr_ctxt->frmr,
> > -                                   &rpl_map->sge[chl_map-
> > >ch[ch_no].start],
> > -                                   &sgl_offset,
> > -                                   read_wr.num_sge);
> > -           if (err) {
> > -                   svc_rdma_unmap_dma(ctxt);
> > -                   svc_rdma_put_context(ctxt, 0);
> > -                   goto out;
> > -           }
> > -           if (((ch+1)->rc_discrim == 0) &&
> > -               (read_wr.num_sge == chl_map->ch[ch_no].count)) {
> > -                   /*
> > -                    * Mark the last RDMA_READ with a bit to
> > -                    * indicate all RPC data has been fetched from
> > -                    * the client and the RPC needs to be enqueued.
> > -                    */
> > -                   set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
> > -                   if (hdr_ctxt->frmr) {
> > -                           set_bit(RDMACTXT_F_FAST_UNREG, &ctxt-
> > >flags);
> > -                           /*
> > -                            * Invalidate the local MR used to map the
> > data
> > -                            * sink.
> > -                            */
> > -                           if (xprt->sc_dev_caps &
> > -                               SVCRDMA_DEVCAP_READ_W_INV) {
> > -                                   read_wr.opcode =
> > -
> >     IB_WR_RDMA_READ_WITH_INV;
> > -                                   ctxt->wr_op = read_wr.opcode;
> > -                                   read_wr.ex.invalidate_rkey =
> > -                                           ctxt->frmr->mr->lkey;
> > -                           } else {
> > -                                   /* Prepare INVALIDATE WR */
> > -                                   memset(&inv_wr, 0, sizeof inv_wr);
> > -                                   inv_wr.opcode = IB_WR_LOCAL_INV;
> > -                                   inv_wr.send_flags =
> > IB_SEND_SIGNALED;
> > -                                   inv_wr.ex.invalidate_rkey =
> > -                                           hdr_ctxt->frmr->mr->lkey;
> > -                                   read_wr.next = &inv_wr;
> > -                           }
> > -                   }
> > -                   ctxt->read_hdr = hdr_ctxt;
> > -           }
> > -           /* Post the read */
> > -           err = svc_rdma_send(xprt, &read_wr);
> > -           if (err) {
> > -                   printk(KERN_ERR "svcrdma: Error %d posting
> > RDMA_READ\n",
> > -                          err);
> > -                   set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
> > -                   svc_rdma_unmap_dma(ctxt);
> > -                   svc_rdma_put_context(ctxt, 0);
> > -                   goto out;
> > +           byte_count = ntohl(ch->rc_target.rs_length);
> > +
> > +           while (byte_count > 0) {
> > +                   ret = reader(xprt, rqstp, head,
> > +                                &page_no, &page_offset,
> > +                                ntohl(ch->rc_target.rs_handle),
> > +                                byte_count, rs_offset,
> > +                                ((ch+1)->rc_discrim == 0) /* last */
> > +                                );
> > +                   if (ret < 0)
> > +                           goto err;
> > +                   byte_count -= ret;
> > +                   rs_offset += ret;
> > +                   head->arg.buflen += ret;
> >             }
> > -           atomic_inc(&rdma_stat_read);
> > -
> > -           if (read_wr.num_sge < chl_map->ch[ch_no].count) {
> > -                   chl_map->ch[ch_no].count -= read_wr.num_sge;
> > -                   chl_map->ch[ch_no].start += read_wr.num_sge;
> > -                   goto next_sge;
> > -           }
> > -           sgl_offset = 0;
> > -           err = 1;
> >     }
> > -
> > - out:
> > -   svc_rdma_put_req_map(rpl_map);
> > -   svc_rdma_put_req_map(chl_map);
> > -
> > +   ret = 1;
> > + err:
> >     /* Detach arg pages. svc_recv will replenish them */
> > -   for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages;
> > ch_no++)
> > -           rqstp->rq_pages[ch_no] = NULL;
> > +   for (page_no = 0;
> > +        &rqstp->rq_pages[page_no] < rqstp->rq_respages; page_no++)
> > +           rqstp->rq_pages[page_no] = NULL;
> >
> > -   return err;
> > +   return ret;
> >  }
> >
> >  static int rdma_read_complete(struct svc_rqst *rqstp, @@ -595,13 +504,9
> > @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
> >                               struct svc_rdma_op_ctxt,
> >                               dto_q);
> >             list_del_init(&ctxt->dto_q);
> > -   }
> > -   if (ctxt) {
> >             spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
> >             return rdma_read_complete(rqstp, ctxt);
> > -   }
> > -
> > -   if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
> > +   } else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
> >             ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,
> >                               struct svc_rdma_op_ctxt,
> >                               dto_q);
> > @@ -621,7 +526,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
> >             if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
> >                     goto close_out;
> >
> > -           BUG_ON(ret);
> >             goto out;
> >     }
> >     dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p,
> > status=%d\n", @@ -644,12 +548,11 @@ int svc_rdma_recvfrom(struct
> > svc_rqst *rqstp)
> >     }
> >
> >     /* Read read-list data. */
> > -   ret = rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt);
> > +   ret = rdma_read_chunks(rdma_xprt, rmsgp, rqstp, ctxt);
> >     if (ret > 0) {
> >             /* read-list posted, defer until data received from client. */
> >             goto defer;
> > -   }
> > -   if (ret < 0) {
> > +   } else if (ret < 0) {
> >             /* Post of read-list failed, free context. */
> >             svc_rdma_put_context(ctxt, 1);
> >             return 0;
> > diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> > b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> > index 7e024a5..49fd21a 100644
> > --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> > +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> > @@ -1,4 +1,5 @@
> >  /*
> > + * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
> >   * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
> >   *
> >   * This software is available to you under a choice of one of two @@ 
> > -49,152
> > +50,6 @@
> >
> >  #define RPCDBG_FACILITY    RPCDBG_SVCXPRT
> >
> > -/* Encode an XDR as an array of IB SGE
> > - *
> > - * Assumptions:
> > - * - head[0] is physically contiguous.
> > - * - tail[0] is physically contiguous.
> > - * - pages[] is not physically or virtually contiguous and consists of
> > - *   PAGE_SIZE elements.
> > - *
> > - * Output:
> > - * SGE[0]              reserved for RCPRDMA header
> > - * SGE[1]              data from xdr->head[]
> > - * SGE[2..sge_count-2] data from xdr->pages[]
> > - * SGE[sge_count-1]    data from xdr->tail.
> > - *
> > - * The max SGE we need is the length of the XDR / pagesize + one for
> > - * head + one for tail + one for RPCRDMA header. Since
> > RPCSVC_MAXPAGES
> > - * reserves a page for both the request and the reply header, and this
> > - * array is only concerned with the reply we are assured that we have
> > - * on extra page for the RPCRMDA header.
> > - */
> > -static int fast_reg_xdr(struct svcxprt_rdma *xprt,
> > -                   struct xdr_buf *xdr,
> > -                   struct svc_rdma_req_map *vec)
> > -{
> > -   int sge_no;
> > -   u32 sge_bytes;
> > -   u32 page_bytes;
> > -   u32 page_off;
> > -   int page_no = 0;
> > -   u8 *frva;
> > -   struct svc_rdma_fastreg_mr *frmr;
> > -
> > -   frmr = svc_rdma_get_frmr(xprt);
> > -   if (IS_ERR(frmr))
> > -           return -ENOMEM;
> > -   vec->frmr = frmr;
> > -
> > -   /* Skip the RPCRDMA header */
> > -   sge_no = 1;
> > -
> > -   /* Map the head. */
> > -   frva = (void *)((unsigned long)(xdr->head[0].iov_base) &
> > PAGE_MASK);
> > -   vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
> > -   vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
> > -   vec->count = 2;
> > -   sge_no++;
> > -
> > -   /* Map the XDR head */
> > -   frmr->kva = frva;
> > -   frmr->direction = DMA_TO_DEVICE;
> > -   frmr->access_flags = 0;
> > -   frmr->map_len = PAGE_SIZE;
> > -   frmr->page_list_len = 1;
> > -   page_off = (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
> > -   frmr->page_list->page_list[page_no] =
> > -           ib_dma_map_page(xprt->sc_cm_id->device,
> > -                           virt_to_page(xdr->head[0].iov_base),
> > -                           page_off,
> > -                           PAGE_SIZE - page_off,
> > -                           DMA_TO_DEVICE);
> > -   if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > -                            frmr->page_list->page_list[page_no]))
> > -           goto fatal_err;
> > -   atomic_inc(&xprt->sc_dma_used);
> > -
> > -   /* Map the XDR page list */
> > -   page_off = xdr->page_base;
> > -   page_bytes = xdr->page_len + page_off;
> > -   if (!page_bytes)
> > -           goto encode_tail;
> > -
> > -   /* Map the pages */
> > -   vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off;
> > -   vec->sge[sge_no].iov_len = page_bytes;
> > -   sge_no++;
> > -   while (page_bytes) {
> > -           struct page *page;
> > -
> > -           page = xdr->pages[page_no++];
> > -           sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE -
> > page_off));
> > -           page_bytes -= sge_bytes;
> > -
> > -           frmr->page_list->page_list[page_no] =
> > -                   ib_dma_map_page(xprt->sc_cm_id->device,
> > -                                   page, page_off,
> > -                                   sge_bytes, DMA_TO_DEVICE);
> > -           if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > -                                    frmr->page_list-
> > >page_list[page_no]))
> > -                   goto fatal_err;
> > -
> > -           atomic_inc(&xprt->sc_dma_used);
> > -           page_off = 0; /* reset for next time through loop */
> > -           frmr->map_len += PAGE_SIZE;
> > -           frmr->page_list_len++;
> > -   }
> > -   vec->count++;
> > -
> > - encode_tail:
> > -   /* Map tail */
> > -   if (0 == xdr->tail[0].iov_len)
> > -           goto done;
> > -
> > -   vec->count++;
> > -   vec->sge[sge_no].iov_len = xdr->tail[0].iov_len;
> > -
> > -   if (((unsigned long)xdr->tail[0].iov_base & PAGE_MASK) ==
> > -       ((unsigned long)xdr->head[0].iov_base & PAGE_MASK)) {
> > -           /*
> > -            * If head and tail use the same page, we don't need
> > -            * to map it again.
> > -            */
> > -           vec->sge[sge_no].iov_base = xdr->tail[0].iov_base;
> > -   } else {
> > -           void *va;
> > -
> > -           /* Map another page for the tail */
> > -           page_off = (unsigned long)xdr->tail[0].iov_base &
> > ~PAGE_MASK;
> > -           va = (void *)((unsigned long)xdr->tail[0].iov_base &
> > PAGE_MASK);
> > -           vec->sge[sge_no].iov_base = frva + frmr->map_len +
> > page_off;
> > -
> > -           frmr->page_list->page_list[page_no] =
> > -               ib_dma_map_page(xprt->sc_cm_id->device,
> > virt_to_page(va),
> > -                               page_off,
> > -                               PAGE_SIZE,
> > -                               DMA_TO_DEVICE);
> > -           if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > -                                    frmr->page_list-
> > >page_list[page_no]))
> > -                   goto fatal_err;
> > -           atomic_inc(&xprt->sc_dma_used);
> > -           frmr->map_len += PAGE_SIZE;
> > -           frmr->page_list_len++;
> > -   }
> > -
> > - done:
> > -   if (svc_rdma_fastreg(xprt, frmr))
> > -           goto fatal_err;
> > -
> > -   return 0;
> > -
> > - fatal_err:
> > -   printk("svcrdma: Error fast registering memory for xprt %p\n", xprt);
> > -   vec->frmr = NULL;
> > -   svc_rdma_put_frmr(xprt, frmr);
> > -   return -EIO;
> > -}
> > -
> >  static int map_xdr(struct svcxprt_rdma *xprt,
> >                struct xdr_buf *xdr,
> >                struct svc_rdma_req_map *vec)
> > @@ -208,9 +63,6 @@ static int map_xdr(struct svcxprt_rdma *xprt,
> >     BUG_ON(xdr->len !=
> >            (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len));
> >
> > -   if (xprt->sc_frmr_pg_list_len)
> > -           return fast_reg_xdr(xprt, xdr, vec);
> > -
> >     /* Skip the first sge, this is for the RPCRDMA header */
> >     sge_no = 1;
> >
> > @@ -282,8 +134,6 @@ static dma_addr_t dma_map_xdr(struct
> > svcxprt_rdma *xprt,  }
> >
> >  /* Assumptions:
> > - * - We are using FRMR
> > - *     - or -
> >   * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
> >   */
> >  static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp, 
> > @@ -
> > 327,23 +177,16 @@ static int send_write(struct svcxprt_rdma *xprt, struct
> > svc_rqst *rqstp,
> >             sge_bytes = min_t(size_t,
> >                       bc, vec->sge[xdr_sge_no].iov_len-sge_off);
> >             sge[sge_no].length = sge_bytes;
> > -           if (!vec->frmr) {
> > -                   sge[sge_no].addr =
> > -                           dma_map_xdr(xprt, &rqstp->rq_res,
> > xdr_off,
> > -                                       sge_bytes, DMA_TO_DEVICE);
> > -                   xdr_off += sge_bytes;
> > -                   if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > -                                            sge[sge_no].addr))
> > -                           goto err;
> > -                   atomic_inc(&xprt->sc_dma_used);
> > -                   sge[sge_no].lkey = xprt->sc_dma_lkey;
> > -           } else {
> > -                   sge[sge_no].addr = (unsigned long)
> > -                           vec->sge[xdr_sge_no].iov_base + sge_off;
> > -                   sge[sge_no].lkey = vec->frmr->mr->lkey;
> > -           }
> > +           sge[sge_no].addr =
> > +                   dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
> > +                               sge_bytes, DMA_TO_DEVICE);
> > +           xdr_off += sge_bytes;
> > +           if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > +                                    sge[sge_no].addr))
> > +                   goto err;
> > +           atomic_inc(&xprt->sc_dma_used);
> > +           sge[sge_no].lkey = xprt->sc_dma_lkey;
> >             ctxt->count++;
> > -           ctxt->frmr = vec->frmr;
> >             sge_off = 0;
> >             sge_no++;
> >             xdr_sge_no++;
> > @@ -369,7 +212,6 @@ static int send_write(struct svcxprt_rdma *xprt, struct
> > svc_rqst *rqstp,
> >     return 0;
> >   err:
> >     svc_rdma_unmap_dma(ctxt);
> > -   svc_rdma_put_frmr(xprt, vec->frmr);
> >     svc_rdma_put_context(ctxt, 0);
> >     /* Fatal error, close transport */
> >     return -EIO;
> > @@ -397,10 +239,7 @@ static int send_write_chunks(struct svcxprt_rdma
> > *xprt,
> >     res_ary = (struct rpcrdma_write_array *)
> >             &rdma_resp->rm_body.rm_chunks[1];
> >
> > -   if (vec->frmr)
> > -           max_write = vec->frmr->map_len;
> > -   else
> > -           max_write = xprt->sc_max_sge * PAGE_SIZE;
> > +   max_write = xprt->sc_max_sge * PAGE_SIZE;
> >
> >     /* Write chunks start at the pagelist */
> >     for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0; @@ -
> > 472,10 +311,7 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
> >     res_ary = (struct rpcrdma_write_array *)
> >             &rdma_resp->rm_body.rm_chunks[2];
> >
> > -   if (vec->frmr)
> > -           max_write = vec->frmr->map_len;
> > -   else
> > -           max_write = xprt->sc_max_sge * PAGE_SIZE;
> > +   max_write = xprt->sc_max_sge * PAGE_SIZE;
> >
> >     /* xdr offset starts at RPC message */
> >     nchunks = ntohl(arg_ary->wc_nchunks);
> > @@ -545,7 +381,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
> >                   int byte_count)
> >  {
> >     struct ib_send_wr send_wr;
> > -   struct ib_send_wr inv_wr;
> >     int sge_no;
> >     int sge_bytes;
> >     int page_no;
> > @@ -559,7 +394,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
> >                    "svcrdma: could not post a receive buffer, err=%d."
> >                    "Closing transport %p.\n", ret, rdma);
> >             set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
> > -           svc_rdma_put_frmr(rdma, vec->frmr);
> >             svc_rdma_put_context(ctxt, 0);
> >             return -ENOTCONN;
> >     }
> > @@ -567,11 +401,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
> >     /* Prepare the context */
> >     ctxt->pages[0] = page;
> >     ctxt->count = 1;
> > -   ctxt->frmr = vec->frmr;
> > -   if (vec->frmr)
> > -           set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
> > -   else
> > -           clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
> >
> >     /* Prepare the SGE for the RPCRDMA Header */
> >     ctxt->sge[0].lkey = rdma->sc_dma_lkey; @@ -590,21 +419,15 @@
> > static int send_reply(struct svcxprt_rdma *rdma,
> >             int xdr_off = 0;
> >             sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len,
> > byte_count);
> >             byte_count -= sge_bytes;
> > -           if (!vec->frmr) {
> > -                   ctxt->sge[sge_no].addr =
> > -                           dma_map_xdr(rdma, &rqstp->rq_res,
> > xdr_off,
> > -                                       sge_bytes, DMA_TO_DEVICE);
> > -                   xdr_off += sge_bytes;
> > -                   if (ib_dma_mapping_error(rdma->sc_cm_id->device,
> > -                                            ctxt->sge[sge_no].addr))
> > -                           goto err;
> > -                   atomic_inc(&rdma->sc_dma_used);
> > -                   ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;
> > -           } else {
> > -                   ctxt->sge[sge_no].addr = (unsigned long)
> > -                           vec->sge[sge_no].iov_base;
> > -                   ctxt->sge[sge_no].lkey = vec->frmr->mr->lkey;
> > -           }
> > +           ctxt->sge[sge_no].addr =
> > +                   dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
> > +                               sge_bytes, DMA_TO_DEVICE);
> > +           xdr_off += sge_bytes;
> > +           if (ib_dma_mapping_error(rdma->sc_cm_id->device,
> > +                                    ctxt->sge[sge_no].addr))
> > +                   goto err;
> > +           atomic_inc(&rdma->sc_dma_used);
> > +           ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;
> >             ctxt->sge[sge_no].length = sge_bytes;
> >     }
> >     BUG_ON(byte_count != 0);
> > @@ -627,6 +450,7 @@ static int send_reply(struct svcxprt_rdma *rdma,
> >                     ctxt->sge[page_no+1].length = 0;
> >     }
> >     rqstp->rq_next_page = rqstp->rq_respages + 1;
> > +
> >     BUG_ON(sge_no > rdma->sc_max_sge);
> >     memset(&send_wr, 0, sizeof send_wr);
> >     ctxt->wr_op = IB_WR_SEND;
> > @@ -635,15 +459,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
> >     send_wr.num_sge = sge_no;
> >     send_wr.opcode = IB_WR_SEND;
> >     send_wr.send_flags =  IB_SEND_SIGNALED;
> > -   if (vec->frmr) {
> > -           /* Prepare INVALIDATE WR */
> > -           memset(&inv_wr, 0, sizeof inv_wr);
> > -           inv_wr.opcode = IB_WR_LOCAL_INV;
> > -           inv_wr.send_flags = IB_SEND_SIGNALED;
> > -           inv_wr.ex.invalidate_rkey =
> > -                   vec->frmr->mr->lkey;
> > -           send_wr.next = &inv_wr;
> > -   }
> >
> >     ret = svc_rdma_send(rdma, &send_wr);
> >     if (ret)
> > @@ -653,7 +468,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
> >
> >   err:
> >     svc_rdma_unmap_dma(ctxt);
> > -   svc_rdma_put_frmr(rdma, vec->frmr);
> >     svc_rdma_put_context(ctxt, 1);
> >     return -EIO;
> >  }
> > diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c
> > b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> > index 25688fa..2c5b201 100644
> > --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
> > +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> > @@ -1,4 +1,5 @@
> >  /*
> > + * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
> >   * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
> >   *
> >   * This software is available to you under a choice of one of two @@ -160,7
> > +161,6 @@ struct svc_rdma_req_map *svc_rdma_get_req_map(void)
> >             schedule_timeout_uninterruptible(msecs_to_jiffies(500));
> >     }
> >     map->count = 0;
> > -   map->frmr = NULL;
> >     return map;
> >  }
> >
> > @@ -336,22 +336,21 @@ static void process_context(struct svcxprt_rdma
> > *xprt,
> >
> >     switch (ctxt->wr_op) {
> >     case IB_WR_SEND:
> > -           if (test_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags))
> > -                   svc_rdma_put_frmr(xprt, ctxt->frmr);
> > +           BUG_ON(ctxt->frmr);
> >             svc_rdma_put_context(ctxt, 1);
> >             break;
> >
> >     case IB_WR_RDMA_WRITE:
> > +           BUG_ON(ctxt->frmr);
> >             svc_rdma_put_context(ctxt, 0);
> >             break;
> >
> >     case IB_WR_RDMA_READ:
> >     case IB_WR_RDMA_READ_WITH_INV:
> > +           svc_rdma_put_frmr(xprt, ctxt->frmr);
> >             if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
> >                     struct svc_rdma_op_ctxt *read_hdr = ctxt-
> > >read_hdr;
> >                     BUG_ON(!read_hdr);
> > -                   if (test_bit(RDMACTXT_F_FAST_UNREG, &ctxt-
> > >flags))
> > -                           svc_rdma_put_frmr(xprt, ctxt->frmr);
> >                     spin_lock_bh(&xprt->sc_rq_dto_lock);
> >                     set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
> >                     list_add_tail(&read_hdr->dto_q,
> > @@ -363,6 +362,7 @@ static void process_context(struct svcxprt_rdma
> > *xprt,
> >             break;
> >
> >     default:
> > +           BUG_ON(1);
> >             printk(KERN_ERR "svcrdma: unexpected completion type, "
> >                    "opcode=%d\n",
> >                    ctxt->wr_op);
> > @@ -378,29 +378,42 @@ static void process_context(struct svcxprt_rdma
> > *xprt,  static void sq_cq_reap(struct svcxprt_rdma *xprt)  {
> >     struct svc_rdma_op_ctxt *ctxt = NULL;
> > -   struct ib_wc wc;
> > +   struct ib_wc wc_a[6];
> > +   struct ib_wc *wc;
> >     struct ib_cq *cq = xprt->sc_sq_cq;
> >     int ret;
> >
> > +   memset(wc_a, 0, sizeof(wc_a));
> > +
> >     if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
> >             return;
> >
> >     ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
> >     atomic_inc(&rdma_stat_sq_poll);
> > -   while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
> > -           if (wc.status != IB_WC_SUCCESS)
> > -                   /* Close the transport */
> > -                   set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
> > +   while ((ret = ib_poll_cq(cq, ARRAY_SIZE(wc_a), wc_a)) > 0) {
> > +           int i;
> >
> > -           /* Decrement used SQ WR count */
> > -           atomic_dec(&xprt->sc_sq_count);
> > -           wake_up(&xprt->sc_send_wait);
> > +           for (i = 0; i < ret; i++) {
> > +                   wc = &wc_a[i];
> > +                   if (wc->status != IB_WC_SUCCESS) {
> > +                           dprintk("svcrdma: sq wc err status %d\n",
> > +                                   wc->status);
> >
> > -           ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
> > -           if (ctxt)
> > -                   process_context(xprt, ctxt);
> > +                           /* Close the transport */
> > +                           set_bit(XPT_CLOSE, &xprt-
> > >sc_xprt.xpt_flags);
> > +                   }
> >
> > -           svc_xprt_put(&xprt->sc_xprt);
> > +                   /* Decrement used SQ WR count */
> > +                   atomic_dec(&xprt->sc_sq_count);
> > +                   wake_up(&xprt->sc_send_wait);
> > +
> > +                   ctxt = (struct svc_rdma_op_ctxt *)
> > +                           (unsigned long)wc->wr_id;
> > +                   if (ctxt)
> > +                           process_context(xprt, ctxt);
> > +
> > +                   svc_xprt_put(&xprt->sc_xprt);
> > +           }
> >     }
> >
> >     if (ctxt)
> > @@ -993,7 +1006,11 @@ static struct svc_xprt *svc_rdma_accept(struct
> > svc_xprt *xprt)
> >                     need_dma_mr = 0;
> >             break;
> >     case RDMA_TRANSPORT_IB:
> > -           if (!(devattr.device_cap_flags &
> > IB_DEVICE_LOCAL_DMA_LKEY)) {
> > +           if (!(newxprt->sc_dev_caps &
> > SVCRDMA_DEVCAP_FAST_REG)) {
> > +                   need_dma_mr = 1;
> > +                   dma_mr_acc = IB_ACCESS_LOCAL_WRITE;
> > +           } else if (!(devattr.device_cap_flags &
> > +                        IB_DEVICE_LOCAL_DMA_LKEY)) {
> >                     need_dma_mr = 1;
> >                     dma_mr_acc = IB_ACCESS_LOCAL_WRITE;
> >             } else
> > @@ -1190,14 +1207,7 @@ static int svc_rdma_has_wspace(struct svc_xprt
> > *xprt)
> >             container_of(xprt, struct svcxprt_rdma, sc_xprt);
> >
> >     /*
> > -    * If there are fewer SQ WR available than required to send a
> > -    * simple response, return false.
> > -    */
> > -   if ((rdma->sc_sq_depth - atomic_read(&rdma->sc_sq_count) < 3))
> > -           return 0;
> > -
> > -   /*
> > -    * ...or there are already waiters on the SQ,
> > +    * If there are already waiters on the SQ,
> >      * return false.
> >      */
> >     if (waitqueue_active(&rdma->sc_send_wait))
> >
> > --
> > To unsubscribe from this list: send the line "unsubscribe linux-rdma" in the
> > body of a message to majord...@vger.kernel.org More majordomo info at
> > http://vger.kernel.org/majordomo-info.html

--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to