From: Mike Christie <micha...@cs.wisc.edu>

This adds support for a scatterlist rbd obj_request_type, so LIO
can pass down its sg to rbd.

Signed-off-by: Mike Christie <micha...@cs.wisc.edu>
---
 drivers/block/rbd.c | 105 ++++++++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 94 insertions(+), 11 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 60257cf..bc0466c 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -44,6 +44,7 @@
 #include <linux/slab.h>
 #include <linux/idr.h>
 #include <linux/workqueue.h>
+#include <linux/scatterlist.h>
 
 #include "rbd_types.h"
 
@@ -208,7 +209,7 @@ struct rbd_obj_request;
 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
 
 enum obj_request_type {
-       OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
+       OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES, OBJ_REQUEST_SG,
 };
 
 enum obj_operation_type {
@@ -264,6 +265,10 @@ struct rbd_obj_request {
                        struct page     **pages;
                        u32             page_count;
                };
+               struct {
+                       struct scatterlist      *sg;
+                       unsigned int            init_sg_offset;
+               };
        };
        struct page             **copyup_pages;
        u32                     copyup_page_count;
@@ -295,16 +300,22 @@ struct rbd_img_request {
                u64                     snap_id;        /* for reads */
                struct ceph_snap_context *snapc;        /* for writes */
        };
-       union {
-               struct request          *rq;            /* block request */
-               struct rbd_obj_request  *obj_request;   /* obj req initiator */
-       };
+
+       struct request          *rq;            /* block request */
+       struct rbd_obj_request  *obj_request;   /* obj req initiator */
+
        struct page             **copyup_pages;
        u32                     copyup_page_count;
        spinlock_t              completion_lock;/* protects next_completion */
        u32                     next_completion;
        rbd_img_callback_t      callback;
+       /*
+        * xferred is the bytes that have successfully been transferred.
+        * completed is the bytes that have been accounted for and includes
+        * both failed and successfully transffered bytes.
+        */
        u64                     xferred;/* aggregate bytes transferred */
+       u64                     completed;
        int                     result; /* first nonzero obj_request result */
 
        u32                     obj_request_count;
@@ -1273,6 +1284,34 @@ static void zero_bio_chain(struct bio *chain, int 
start_ofs)
        }
 }
 
+static void zero_sg(struct scatterlist *sgl, u64 start, u64 length)
+{
+       struct scatterlist *sg = sgl;
+       u64 end = start + length;
+       u64 pos = 0;
+
+       while (pos < end && sg) {
+               if (pos + sg->length > start) {
+                       int sg_offset = max_t(int, start - pos, 0);
+                       unsigned int length = min_t(unsigned int,
+                                                   sg->length - sg_offset,
+                                                   end - pos);
+                       void *kaddr;
+                       unsigned long flags;
+
+                       local_irq_save(flags);
+                       kaddr = kmap_atomic(sg_page(sg));
+                       memset(kaddr + sg_offset + sg->offset, 0, length);
+                       flush_dcache_page(sg_page(sg));
+                       kunmap_atomic(kaddr);
+                       local_irq_restore(flags);
+               }
+
+               pos += sg->length;
+               sg = sg_next(sg);
+       }
+}
+
 /*
  * similar to zero_bio_chain(), zeros data defined by a page array,
  * starting at the given byte offset from the start of the array and
@@ -1547,6 +1586,7 @@ static bool obj_request_type_valid(enum obj_request_type 
type)
        case OBJ_REQUEST_NODATA:
        case OBJ_REQUEST_BIO:
        case OBJ_REQUEST_PAGES:
+       case OBJ_REQUEST_SG:
                return true;
        default:
                return false;
@@ -1730,14 +1770,18 @@ rbd_img_obj_request_read_callback(struct 
rbd_obj_request *obj_request)
        if (obj_request->result == -ENOENT) {
                if (obj_request->type == OBJ_REQUEST_BIO)
                        zero_bio_chain(obj_request->bio_list, 0);
-               else
+               else if (obj_request->type == OBJ_REQUEST_PAGES)
                        zero_pages(obj_request->pages, 0, length);
+               else if (obj_request->type == OBJ_REQUEST_SG)
+                       zero_sg(obj_request->sg, 0, length);
                obj_request->result = 0;
        } else if (xferred < length && !obj_request->result) {
                if (obj_request->type == OBJ_REQUEST_BIO)
                        zero_bio_chain(obj_request->bio_list, xferred);
-               else
+               else if (obj_request->type == OBJ_REQUEST_PAGES)
                        zero_pages(obj_request->pages, xferred, length);
+               else if (obj_request->type == OBJ_REQUEST_SG)
+                       zero_sg(obj_request->sg, xferred, length);
        }
        obj_request->xferred = length;
        obj_request_done_set(obj_request);
@@ -2067,6 +2111,7 @@ static void rbd_obj_request_destroy(struct kref *kref)
        rbd_assert(obj_request_type_valid(obj_request->type));
        switch (obj_request->type) {
        case OBJ_REQUEST_NODATA:
+       case OBJ_REQUEST_SG:
                break;          /* Nothing to do */
        case OBJ_REQUEST_BIO:
                if (obj_request->bio_list)
@@ -2168,6 +2213,7 @@ static struct rbd_img_request *rbd_img_request_create(
        img_request->offset = offset;
        img_request->length = length;
        img_request->flags = 0;
+       img_request->completed = 0;
        if (op_type == OBJ_OP_DISCARD) {
                img_request_discard_set(img_request);
                img_request->snapc = snapc;
@@ -2293,6 +2339,7 @@ static bool rbd_img_obj_end_request(struct 
rbd_obj_request *obj_request)
                 */
                xferred = obj_request->length;
        }
+       img_request->completed += xferred;
 
        /* Image object requests don't own their page array */
 
@@ -2304,12 +2351,15 @@ static bool rbd_img_obj_end_request(struct 
rbd_obj_request *obj_request)
        if (img_request_child_test(img_request)) {
                rbd_assert(img_request->obj_request != NULL);
                more = obj_request->which < img_request->obj_request_count - 1;
-       } else {
-               rbd_assert(img_request->rq != NULL);
-
+       } else if (img_request->rq) {
                more = blk_update_request(img_request->rq, result, xferred);
                if (!more)
                        __blk_mq_end_request(img_request->rq, result);
+       } else {
+               if (img_request->completed < img_request->length)
+                       more = true;
+               else
+                       more = false;
        }
 
        return more;
@@ -2411,6 +2461,10 @@ static void rbd_img_obj_request_fill(struct 
rbd_obj_request *obj_request,
                osd_req_op_extent_osd_data_pages(osd_request, num_ops,
                                        obj_request->pages, length,
                                        offset & ~PAGE_MASK, false, false);
+       else if (obj_request->type == OBJ_REQUEST_SG)
+               osd_req_op_extent_osd_data_sg(osd_request, num_ops,
+                                       obj_request->sg,
+                                       obj_request->init_sg_offset, length);
 
        /* Discards are also writes */
        if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
@@ -2436,7 +2490,9 @@ static int rbd_img_request_fill(struct rbd_img_request 
*img_request,
        struct rbd_obj_request *next_obj_request;
        struct bio *bio_list = NULL;
        unsigned int bio_offset = 0;
+       unsigned int sg_offset = 0;
        struct page **pages = NULL;
+       struct scatterlist *sgl = NULL;
        enum obj_operation_type op_type;
        u64 img_offset;
        u64 resid;
@@ -2455,6 +2511,8 @@ static int rbd_img_request_fill(struct rbd_img_request 
*img_request,
                           bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
        } else if (type == OBJ_REQUEST_PAGES) {
                pages = data_desc;
+       } else if (type == OBJ_REQUEST_SG) {
+               sgl = data_desc;
        }
 
        while (resid) {
@@ -2502,6 +2560,27 @@ static int rbd_img_request_fill(struct rbd_img_request 
*img_request,
                        if ((offset + length) & ~PAGE_MASK)
                                page_count--;   /* more on last page */
                        pages += page_count;
+               } else if (type == OBJ_REQUEST_SG) {
+                       u64 sg_length = 0;
+
+                       obj_request->init_sg_offset = sg_offset;
+                       obj_request->sg = sgl;
+                       do {
+                               sg_length += (sgl->length - sg_offset);
+                               sg_offset = 0;
+                               if (sg_length > length) {
+                                       sg_offset = sgl->length -
+                                                       (sg_length - length);
+                                       break;
+                               }
+                               /*
+                                * For WRITE_SAME we have a single sg that
+                                * is written possibly multiple times over
+                                * img_request->length bytes.
+                                */
+                               if (sg_next(sgl))
+                                       sgl = sg_next(sgl);
+                       } while (true);
                }
 
                osd_req = rbd_osd_req_create(rbd_dev, op_type,
@@ -3058,9 +3137,13 @@ static void rbd_img_parent_read(struct rbd_obj_request 
*obj_request)
        if (obj_request->type == OBJ_REQUEST_BIO)
                result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
                                                obj_request->bio_list);
-       else
+       else if (obj_request->type == OBJ_REQUEST_PAGES)
                result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
                                                obj_request->pages);
+       else
+               result = rbd_img_request_fill(img_request, OBJ_REQUEST_SG,
+                                               obj_request->sg);
+
        if (result)
                goto out_err;
 
-- 
1.8.3.1

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to