From: Mike Christie <micha...@cs.wisc.edu> LIO uses scatterlist for its page/data management. This patch adds a scatterlist messenger data type, so LIO can pass its sg down directly to rbd.
Signed-off-by: Mike Christie <micha...@cs.wisc.edu> --- include/linux/ceph/messenger.h | 13 ++++++ include/linux/ceph/osd_client.h | 12 +++++- net/ceph/messenger.c | 96 +++++++++++++++++++++++++++++++++++++++++ net/ceph/osd_client.c | 26 +++++++++++ 4 files changed, 146 insertions(+), 1 deletion(-) diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index 3775327..bc1bde8 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -79,6 +79,7 @@ enum ceph_msg_data_type { #ifdef CONFIG_BLOCK CEPH_MSG_DATA_BIO, /* data source/destination is a bio list */ #endif /* CONFIG_BLOCK */ + CEPH_MSG_DATA_SG, /* data source/destination is a scatterlist */ }; static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type) @@ -90,6 +91,7 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type) #ifdef CONFIG_BLOCK case CEPH_MSG_DATA_BIO: #endif /* CONFIG_BLOCK */ + case CEPH_MSG_DATA_SG: return true; default: return false; @@ -112,6 +114,11 @@ struct ceph_msg_data { unsigned int alignment; /* first page */ }; struct ceph_pagelist *pagelist; + struct { + struct scatterlist *sgl; + unsigned int sgl_init_offset; + u64 sgl_length; + }; }; }; @@ -139,6 +146,10 @@ struct ceph_msg_data_cursor { struct page *page; /* page from list */ size_t offset; /* bytes from list */ }; + struct { + struct scatterlist *sg; /* curr sg */ + unsigned int sg_consumed; + }; }; }; @@ -294,6 +305,8 @@ extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg, extern void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio, size_t length); #endif /* CONFIG_BLOCK */ +extern void ceph_msg_data_add_sg(struct ceph_msg *msg, struct scatterlist *sgl, + unsigned int sgl_init_offset, u64 length); extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, bool can_fail); diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 0890167..2152f06 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -52,6 +52,7 @@ enum ceph_osd_data_type { #ifdef CONFIG_BLOCK CEPH_OSD_DATA_TYPE_BIO, #endif /* CONFIG_BLOCK */ + CEPH_OSD_DATA_TYPE_SG, }; struct ceph_osd_data { @@ -70,6 +71,11 @@ struct ceph_osd_data { struct bio *bio; /* list of bios */ size_t bio_length; /* total in list */ }; + struct { + struct scatterlist *sgl; + size_t sgl_length; + unsigned int sgl_init_offset; + }; #endif /* CONFIG_BLOCK */ }; }; @@ -313,7 +319,11 @@ extern void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *, unsigned int which, struct bio *bio, size_t bio_length); #endif /* CONFIG_BLOCK */ - +extern void osd_req_op_extent_osd_data_sg(struct ceph_osd_request *, + unsigned int which, + struct scatterlist *sgl, + unsigned int init_sg_offset, + u64 length); extern void osd_req_op_cls_request_data_pagelist(struct ceph_osd_request *, unsigned int which, struct ceph_pagelist *pagelist); diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index e3be1d2..08d39fb 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -893,6 +893,75 @@ static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor, #endif /* CONFIG_BLOCK */ /* + * For a sg data item, a piece is whatever remains of the next + * entry in the current sg entry, or the first entry in the next + * sg in the list. + */ +static void ceph_msg_data_sg_cursor_init(struct ceph_msg_data_cursor *cursor, + size_t length) +{ + struct ceph_msg_data *data = cursor->data; + struct scatterlist *sg; + + BUG_ON(data->type != CEPH_MSG_DATA_SG); + + sg = data->sgl; + BUG_ON(!sg); + + cursor->resid = min_t(u64, length, data->sgl_length); + cursor->sg = sg; + cursor->sg_consumed = data->sgl_init_offset; + cursor->last_piece = cursor->resid <= sg->length; +} + +static struct page *ceph_msg_data_sg_next(struct ceph_msg_data_cursor *cursor, + size_t *page_offset, size_t *length) +{ + struct ceph_msg_data *data = cursor->data; + struct scatterlist *sg; + + BUG_ON(data->type != CEPH_MSG_DATA_SG); + + sg = cursor->sg; + BUG_ON(!sg); + + *page_offset = sg->offset + cursor->sg_consumed; + + if (cursor->last_piece) + *length = cursor->resid; + else + *length = sg->length - cursor->sg_consumed; + + /* currently support non clustered sg pages */ + return sg_page(sg); +} + +static bool ceph_msg_data_sg_advance(struct ceph_msg_data_cursor *cursor, + size_t bytes) +{ + BUG_ON(cursor->data->type != CEPH_MSG_DATA_SG); + + /* Advance the cursor offset */ + BUG_ON(cursor->resid < bytes); + cursor->resid -= bytes; + cursor->sg_consumed += bytes; + + if (!bytes || cursor->sg_consumed < cursor->sg->length) + return false; /* more bytes to process in the current page */ + + if (!cursor->resid) + return false; /* no more data */ + + /* For WRITE_SAME we have a single sg that is written over and over */ + if (sg_next(cursor->sg)) + cursor->sg = sg_next(cursor->sg); + cursor->sg_consumed = 0; + + cursor->last_piece = cursor->resid <= cursor->sg->length; + return true; +} + +/* * For a page array, a piece comes from the first page in the array * that has not already been fully consumed. */ @@ -1075,6 +1144,9 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor) ceph_msg_data_bio_cursor_init(cursor, length); break; #endif /* CONFIG_BLOCK */ + case CEPH_MSG_DATA_SG: + ceph_msg_data_sg_cursor_init(cursor, length); + break; case CEPH_MSG_DATA_NONE: default: /* BUG(); */ @@ -1123,6 +1195,9 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor, page = ceph_msg_data_bio_next(cursor, page_offset, length); break; #endif /* CONFIG_BLOCK */ + case CEPH_MSG_DATA_SG: + page = ceph_msg_data_sg_next(cursor, page_offset, length); + break; case CEPH_MSG_DATA_NONE: default: page = NULL; @@ -1159,6 +1234,9 @@ static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, new_piece = ceph_msg_data_bio_advance(cursor, bytes); break; #endif /* CONFIG_BLOCK */ + case CEPH_MSG_DATA_SG: + new_piece = ceph_msg_data_sg_advance(cursor, bytes); + break; case CEPH_MSG_DATA_NONE: default: BUG(); @@ -3182,6 +3260,24 @@ void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio, EXPORT_SYMBOL(ceph_msg_data_add_bio); #endif /* CONFIG_BLOCK */ +void ceph_msg_data_add_sg(struct ceph_msg *msg, struct scatterlist *sgl, + unsigned int sgl_init_offset, u64 length) +{ + struct ceph_msg_data *data; + + BUG_ON(!sgl); + + data = ceph_msg_data_create(CEPH_MSG_DATA_SG); + BUG_ON(!data); + data->sgl = sgl; + data->sgl_length = length; + data->sgl_init_offset = sgl_init_offset; + + list_add_tail(&data->links, &msg->data); + msg->data_length += length; +} +EXPORT_SYMBOL(ceph_msg_data_add_sg); + /* * construct a new message with given type, size * the new msg has a ref count of 1. diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index f8178b7..fd0a52e 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -128,6 +128,16 @@ static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, } #endif /* CONFIG_BLOCK */ +static void ceph_osd_data_sg_init(struct ceph_osd_data *osd_data, + struct scatterlist *sgl, + unsigned int init_sg_offset, u64 length) +{ + osd_data->type = CEPH_OSD_DATA_TYPE_SG; + osd_data->sgl = sgl; + osd_data->sgl_length = length; + osd_data->sgl_init_offset = init_sg_offset; +} + #define osd_req_op_data(oreq, whch, typ, fld) \ ({ \ BUG_ON(whch >= (oreq)->r_num_ops); \ @@ -206,6 +216,17 @@ void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req, EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio); #endif /* CONFIG_BLOCK */ +void osd_req_op_extent_osd_data_sg(struct ceph_osd_request *osd_req, + unsigned int which, struct scatterlist *sgl, + unsigned int init_sg_offset, u64 length) +{ + struct ceph_osd_data *osd_data; + + osd_data = osd_req_op_data(osd_req, which, extent, osd_data); + ceph_osd_data_sg_init(osd_data, sgl, init_sg_offset, length); +} +EXPORT_SYMBOL(osd_req_op_extent_osd_data_sg); + static void osd_req_op_cls_request_info_pagelist( struct ceph_osd_request *osd_req, unsigned int which, struct ceph_pagelist *pagelist) @@ -317,6 +338,8 @@ static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data) case CEPH_OSD_DATA_TYPE_BIO: return (u64)osd_data->bio_length; #endif /* CONFIG_BLOCK */ + case CEPH_OSD_DATA_TYPE_SG: + return osd_data->sgl_length; default: WARN(true, "unrecognized data type %d\n", (int)osd_data->type); return 0; @@ -727,6 +750,9 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg, } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { ceph_msg_data_add_bio(msg, osd_data->bio, length); #endif + } else if (osd_data->type == CEPH_OSD_DATA_TYPE_SG) { + ceph_msg_data_add_sg(msg, osd_data->sgl, + osd_data->sgl_init_offset, length); } else { BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); } -- 1.8.3.1 -- To unsubscribe from this list: send the line "unsubscribe ceph-devel" in the body of a message to majord...@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html