This patch introduces a irqsafe retransmit functionality for a lowcomms
message handle. It's just allocates a new buffer and transmit it again,
no special handling about prioritize it because keeping bytestream in
order.

To avoid another connection look some refactor was done to make a new
buffer allocation with a preexisting connection pointer.

Signed-off-by: Alexander Aring <aahri...@redhat.com>
---
 fs/dlm/lowcomms.c | 84 +++++++++++++++++++++++++++++++----------------
 fs/dlm/lowcomms.h |  1 +
 2 files changed, 56 insertions(+), 29 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index ba782ea84281..d2be58496fd0 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -1409,7 +1409,7 @@ static struct writequeue_entry *new_wq_entry(struct 
connection *con, int len,
 {
        struct writequeue_entry *e;
 
-       spin_lock(&con->writequeue_lock);
+       spin_lock_bh(&con->writequeue_lock);
        if (!list_empty(&con->writequeue)) {
                e = list_last_entry(&con->writequeue, struct writequeue_entry, 
list);
                if (DLM_WQ_REMAIN_BYTES(e) >= len) {
@@ -1421,12 +1421,12 @@ static struct writequeue_entry *new_wq_entry(struct 
connection *con, int len,
 
                        e->end += len;
                        e->users++;
-                       spin_unlock(&con->writequeue_lock);
+                       spin_unlock_bh(&con->writequeue_lock);
 
                        return e;
                }
        }
-       spin_unlock(&con->writequeue_lock);
+       spin_unlock_bh(&con->writequeue_lock);
 
        e = new_writequeue_entry(con, allocation);
        if (!e)
@@ -1436,35 +1436,24 @@ static struct writequeue_entry *new_wq_entry(struct 
connection *con, int len,
        *ppc = page_address(e->page);
        e->end += len;
 
-       spin_lock(&con->writequeue_lock);
+       spin_lock_bh(&con->writequeue_lock);
        if (cb)
                cb(*ppc, priv);
 
        list_add_tail(&e->list, &con->writequeue);
-       spin_unlock(&con->writequeue_lock);
+       spin_unlock_bh(&con->writequeue_lock);
 
        return e;
 };
 
-void *dlm_lowcomms_new_buffer(int nodeid, int len, gfp_t allocation, char 
**ppc,
-                             void (*cb)(void *buf, void *priv), void *priv)
+static void *dlm_lowcomms_new_buffer_con(struct connection *con, int len,
+                                        gfp_t allocation, char **ppc,
+                                        void (*cb)(void *buf, void *priv),
+                                        void *priv)
 {
        struct writequeue_entry *e;
-       struct connection *con;
        struct dlm_msg *msg;
 
-       if (len > DEFAULT_BUFFER_SIZE ||
-           len < sizeof(struct dlm_header)) {
-               BUILD_BUG_ON(PAGE_SIZE < DEFAULT_BUFFER_SIZE);
-               log_print("failed to allocate a buffer of size %d", len);
-               WARN_ON(1);
-               return NULL;
-       }
-
-       con = nodeid2con(nodeid, allocation);
-       if (!con)
-               return NULL;
-
        msg = kzalloc(sizeof(*msg), allocation);
        if (!msg)
                return NULL;
@@ -1484,6 +1473,26 @@ void *dlm_lowcomms_new_buffer(int nodeid, int len, gfp_t 
allocation, char **ppc,
        return msg;
 }
 
+void *dlm_lowcomms_new_buffer(int nodeid, int len, gfp_t allocation, char 
**ppc,
+                             void (*cb)(void *buf, void *priv), void *priv)
+{
+       struct connection *con;
+
+       if (len > DEFAULT_BUFFER_SIZE ||
+           len < sizeof(struct dlm_header)) {
+               BUILD_BUG_ON(PAGE_SIZE < DEFAULT_BUFFER_SIZE);
+               log_print("failed to allocate a buffer of size %d", len);
+               WARN_ON(1);
+               return NULL;
+       }
+
+       con = nodeid2con(nodeid, allocation);
+       if (!con)
+               return NULL;
+
+       return dlm_lowcomms_new_buffer_con(con, len, GFP_ATOMIC, ppc, cb, priv);
+}
+
 void dlm_lowcomms_commit_buffer(void *mh)
 {
        struct dlm_msg *msg = mh;
@@ -1491,7 +1500,7 @@ void dlm_lowcomms_commit_buffer(void *mh)
        struct connection *con = e->con;
        int users;
 
-       spin_lock(&con->writequeue_lock);
+       spin_lock_bh(&con->writequeue_lock);
        list_add(&msg->list, &e->msgs);
        kref_get(&msg->ref);
 
@@ -1500,13 +1509,13 @@ void dlm_lowcomms_commit_buffer(void *mh)
                goto out;
 
        e->len = DLM_WQ_LENGTH_BYTES(e);
-       spin_unlock(&con->writequeue_lock);
+       spin_unlock_bh(&con->writequeue_lock);
 
        queue_work(send_workqueue, &con->swork);
        return;
 
 out:
-       spin_unlock(&con->writequeue_lock);
+       spin_unlock_bh(&con->writequeue_lock);
        return;
 }
 
@@ -1524,6 +1533,23 @@ void dlm_lowcomms_get_buffer(void *mh)
        kref_get(&msg->ref);
 }
 
+/* irqsafe */
+void dlm_lowcomms_resend_buffer(void *mh)
+{
+       struct dlm_msg *msg = mh;
+       void *mh_new;
+       char *ppc;
+
+       mh_new = dlm_lowcomms_new_buffer_con(msg->entry->con, msg->len, 
GFP_ATOMIC,
+                                            &ppc, NULL, NULL);
+       if (!mh_new)
+               return;
+
+       memcpy(ppc, msg->ppc, msg->len);
+       dlm_lowcomms_commit_buffer(mh_new);
+       dlm_lowcomms_put_buffer(mh_new);
+}
+
 /* Send a message */
 static void send_to_sock(struct connection *con)
 {
@@ -1537,7 +1563,7 @@ static void send_to_sock(struct connection *con)
        if (con->sock == NULL)
                goto out_connect;
 
-       spin_lock(&con->writequeue_lock);
+       spin_lock_bh(&con->writequeue_lock);
        for (;;) {
                if (list_empty(&con->writequeue))
                        break;
@@ -1546,7 +1572,7 @@ static void send_to_sock(struct connection *con)
                len = e->len;
                offset = e->offset;
                BUG_ON(len == 0 && e->users == 0);
-               spin_unlock(&con->writequeue_lock);
+               spin_unlock_bh(&con->writequeue_lock);
 
                ret = 0;
                if (len) {
@@ -1574,10 +1600,10 @@ static void send_to_sock(struct connection *con)
                        count = 0;
                }
 
-               spin_lock(&con->writequeue_lock);
+               spin_lock_bh(&con->writequeue_lock);
                writequeue_entry_complete(e, ret);
        }
-       spin_unlock(&con->writequeue_lock);
+       spin_unlock_bh(&con->writequeue_lock);
 out:
        mutex_unlock(&con->sock_mutex);
        return;
@@ -1600,11 +1626,11 @@ static void clean_one_writequeue(struct connection *con)
 {
        struct writequeue_entry *e, *safe;
 
-       spin_lock(&con->writequeue_lock);
+       spin_lock_bh(&con->writequeue_lock);
        list_for_each_entry_safe(e, safe, &con->writequeue, list) {
                free_entry(e);
        }
-       spin_unlock(&con->writequeue_lock);
+       spin_unlock_bh(&con->writequeue_lock);
 }
 
 /* Called from recovery when it knows that a node has
diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h
index fa735497dad8..345aed7e00cc 100644
--- a/fs/dlm/lowcomms.h
+++ b/fs/dlm/lowcomms.h
@@ -30,6 +30,7 @@ int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int 
mark);
 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len);
 void dlm_lowcomms_put_buffer(void *mh);
 void dlm_lowcomms_get_buffer(void *mh);
+void dlm_lowcomms_resend_buffer(void *mh);
 
 #endif                         /* __LOWCOMMS_DOT_H__ */
 
-- 
2.26.3

Reply via email to