This patch introduces a retransmit functionality for a lowcomms message handle. It's just allocates a new buffer and transmit it again, no special handling about prioritize it because keeping bytestream in order.
To avoid another connection look some refactor was done to make a new buffer allocation with a preexisting connection pointer. Signed-off-by: Alexander Aring <aahri...@redhat.com> --- fs/dlm/lowcomms.c | 85 ++++++++++++++++++++++++++++++++++++----------- fs/dlm/lowcomms.h | 1 + 2 files changed, 67 insertions(+), 19 deletions(-) diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index d222e6088ab2..df9827ec12f3 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -126,6 +126,8 @@ struct writequeue_entry { struct dlm_msg { struct writequeue_entry *entry; + struct dlm_msg *orig_msg; + bool retransmit; void *ppc; int len; int idx; /* new()/commit() idx exchange */ @@ -1055,6 +1057,10 @@ static void free_entry(struct writequeue_entry *e) struct dlm_msg *msg, *tmp; list_for_each_entry_safe(msg, tmp, &e->msgs, list) { + if (msg->orig_msg) { + msg->orig_msg->retransmit = false; + kref_put(&msg->orig_msg->ref, dlm_msg_release); + } list_del(&msg->list); kref_put(&msg->ref, dlm_msg_release); } @@ -1494,11 +1500,37 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len, return e; }; +static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len, + gfp_t allocation, char **ppc, + void (*cb)(struct dlm_mhandle *mh), + struct dlm_mhandle *mh) +{ + struct writequeue_entry *e; + struct dlm_msg *msg; + + msg = kzalloc(sizeof(*msg), allocation); + if (!msg) + return NULL; + + kref_init(&msg->ref); + + e = new_wq_entry(con, len, allocation, ppc, cb, mh); + if (!e) { + kfree(msg); + return NULL; + } + + msg->ppc = *ppc; + msg->len = len; + msg->entry = e; + + return msg; +} + struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation, char **ppc, void (*cb)(struct dlm_mhandle *mh), struct dlm_mhandle *mh) { - struct writequeue_entry *e; struct connection *con; struct dlm_msg *msg; int idx; @@ -1518,32 +1550,18 @@ struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation, return NULL; } - msg = kzalloc(sizeof(*msg), allocation); + msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, mh); if (!msg) { srcu_read_unlock(&connections_srcu, idx); return NULL; } - kref_init(&msg->ref); - - e = new_wq_entry(con, len, allocation, ppc, cb, mh); - if (!e) { - srcu_read_unlock(&connections_srcu, idx); - kfree(msg); - return NULL; - } - - msg->ppc = *ppc; - msg->len = len; - msg->entry = e; - /* we assume if successful commit must called */ msg->idx = idx; - return msg; } -void dlm_lowcomms_commit_msg(struct dlm_msg *msg) +static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg) { struct writequeue_entry *e = msg->entry; struct connection *con = e->con; @@ -1561,20 +1579,49 @@ void dlm_lowcomms_commit_msg(struct dlm_msg *msg) spin_unlock(&con->writequeue_lock); queue_work(send_workqueue, &con->swork); - srcu_read_unlock(&connections_srcu, msg->idx); return; out: spin_unlock(&con->writequeue_lock); - srcu_read_unlock(&connections_srcu, msg->idx); return; } +void dlm_lowcomms_commit_msg(struct dlm_msg *msg) +{ + _dlm_lowcomms_commit_msg(msg); + srcu_read_unlock(&connections_srcu, msg->idx); +} + void dlm_lowcomms_put_msg(struct dlm_msg *msg) { kref_put(&msg->ref, dlm_msg_release); } +/* does not held connections_srcu, usage workqueue only */ +int dlm_lowcomms_resend_msg(struct dlm_msg *msg) +{ + struct dlm_msg *msg_resend; + char *ppc; + + if (msg->retransmit) + return 1; + + msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, + GFP_ATOMIC, &ppc, NULL, NULL); + if (!msg_resend) + return -ENOMEM; + + msg->retransmit = true; + kref_get(&msg->ref); + msg_resend->orig_msg = msg; + + memcpy(ppc, msg->ppc, msg->len); + _dlm_lowcomms_commit_msg(msg_resend); + dlm_lowcomms_put_msg(msg_resend); + + return 0; +} + /* Send a message */ static void send_to_sock(struct connection *con) { diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h index cdb8f066f0d8..a4384826442c 100644 --- a/fs/dlm/lowcomms.h +++ b/fs/dlm/lowcomms.h @@ -27,6 +27,7 @@ struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation, struct dlm_mhandle *mh); void dlm_lowcomms_commit_msg(struct dlm_msg *msg); void dlm_lowcomms_put_msg(struct dlm_msg *msg); +int dlm_lowcomms_resend_msg(struct dlm_msg *msg); int dlm_lowcomms_connect_node(int nodeid); int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark); int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len); -- 2.26.3