This patch introduces a retransmit functionality for a lowcomms message
handle. It's just allocates a new buffer and transmit it again, no
special handling about prioritize it because keeping bytestream in order.

To avoid another connection look some refactor was done to make a new
buffer allocation with a preexisting connection pointer.

Signed-off-by: Alexander Aring <aahri...@redhat.com>
---
 fs/dlm/lowcomms.c | 85 ++++++++++++++++++++++++++++++++++++-----------
 fs/dlm/lowcomms.h |  1 +
 2 files changed, 67 insertions(+), 19 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index d222e6088ab2..df9827ec12f3 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -126,6 +126,8 @@ struct writequeue_entry {
 
 struct dlm_msg {
        struct writequeue_entry *entry;
+       struct dlm_msg *orig_msg;
+       bool retransmit;
        void *ppc;
        int len;
        int idx; /* new()/commit() idx exchange */
@@ -1055,6 +1057,10 @@ static void free_entry(struct writequeue_entry *e)
        struct dlm_msg *msg, *tmp;
 
        list_for_each_entry_safe(msg, tmp, &e->msgs, list) {
+               if (msg->orig_msg) {
+                       msg->orig_msg->retransmit = false;
+                       kref_put(&msg->orig_msg->ref, dlm_msg_release);
+               }
                list_del(&msg->list);
                kref_put(&msg->ref, dlm_msg_release);
        }
@@ -1494,11 +1500,37 @@ static struct writequeue_entry *new_wq_entry(struct 
connection *con, int len,
        return e;
 };
 
+static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int 
len,
+                                               gfp_t allocation, char **ppc,
+                                               void (*cb)(struct dlm_mhandle 
*mh),
+                                               struct dlm_mhandle *mh)
+{
+       struct writequeue_entry *e;
+       struct dlm_msg *msg;
+
+       msg = kzalloc(sizeof(*msg), allocation);
+       if (!msg)
+               return NULL;
+
+       kref_init(&msg->ref);
+
+       e = new_wq_entry(con, len, allocation, ppc, cb, mh);
+       if (!e) {
+               kfree(msg);
+               return NULL;
+       }
+
+       msg->ppc = *ppc;
+       msg->len = len;
+       msg->entry = e;
+
+       return msg;
+}
+
 struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
                                     char **ppc, void (*cb)(struct dlm_mhandle 
*mh),
                                     struct dlm_mhandle *mh)
 {
-       struct writequeue_entry *e;
        struct connection *con;
        struct dlm_msg *msg;
        int idx;
@@ -1518,32 +1550,18 @@ struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int 
len, gfp_t allocation,
                return NULL;
        }
 
-       msg = kzalloc(sizeof(*msg), allocation);
+       msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, mh);
        if (!msg) {
                srcu_read_unlock(&connections_srcu, idx);
                return NULL;
        }
 
-       kref_init(&msg->ref);
-
-       e = new_wq_entry(con, len, allocation, ppc, cb, mh);
-       if (!e) {
-               srcu_read_unlock(&connections_srcu, idx);
-               kfree(msg);
-               return NULL;
-       }
-
-       msg->ppc = *ppc;
-       msg->len = len;
-       msg->entry = e;
-
        /* we assume if successful commit must called */
        msg->idx = idx;
-
        return msg;
 }
 
-void dlm_lowcomms_commit_msg(struct dlm_msg *msg)
+static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
 {
        struct writequeue_entry *e = msg->entry;
        struct connection *con = e->con;
@@ -1561,20 +1579,49 @@ void dlm_lowcomms_commit_msg(struct dlm_msg *msg)
        spin_unlock(&con->writequeue_lock);
 
        queue_work(send_workqueue, &con->swork);
-       srcu_read_unlock(&connections_srcu, msg->idx);
        return;
 
 out:
        spin_unlock(&con->writequeue_lock);
-       srcu_read_unlock(&connections_srcu, msg->idx);
        return;
 }
 
+void dlm_lowcomms_commit_msg(struct dlm_msg *msg)
+{
+       _dlm_lowcomms_commit_msg(msg);
+       srcu_read_unlock(&connections_srcu, msg->idx);
+}
+
 void dlm_lowcomms_put_msg(struct dlm_msg *msg)
 {
        kref_put(&msg->ref, dlm_msg_release);
 }
 
+/* does not held connections_srcu, usage workqueue only */
+int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
+{
+       struct dlm_msg *msg_resend;
+       char *ppc;
+
+       if (msg->retransmit)
+               return 1;
+
+       msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len,
+                                             GFP_ATOMIC, &ppc, NULL, NULL);
+       if (!msg_resend)
+               return -ENOMEM;
+
+       msg->retransmit = true;
+       kref_get(&msg->ref);
+       msg_resend->orig_msg = msg;
+
+       memcpy(ppc, msg->ppc, msg->len);
+       _dlm_lowcomms_commit_msg(msg_resend);
+       dlm_lowcomms_put_msg(msg_resend);
+
+       return 0;
+}
+
 /* Send a message */
 static void send_to_sock(struct connection *con)
 {
diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h
index cdb8f066f0d8..a4384826442c 100644
--- a/fs/dlm/lowcomms.h
+++ b/fs/dlm/lowcomms.h
@@ -27,6 +27,7 @@ struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, 
gfp_t allocation,
                                     struct dlm_mhandle *mh);
 void dlm_lowcomms_commit_msg(struct dlm_msg *msg);
 void dlm_lowcomms_put_msg(struct dlm_msg *msg);
+int dlm_lowcomms_resend_msg(struct dlm_msg *msg);
 int dlm_lowcomms_connect_node(int nodeid);
 int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark);
 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len);
-- 
2.26.3

Reply via email to