This patch introduces a irqsafe retransmit functionality for a lowcomms message handle. It's just allocates a new buffer and transmit it again, no special handling about prioritize it because keeping bytestream in order.
To avoid another connection look some refactor was done to make a new buffer allocation with a preexisting connection pointer. Signed-off-by: Alexander Aring <aahri...@redhat.com> --- fs/dlm/lowcomms.c | 84 +++++++++++++++++++++++++++++++---------------- fs/dlm/lowcomms.h | 1 + 2 files changed, 56 insertions(+), 29 deletions(-) diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index ba782ea84281..d2be58496fd0 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -1409,7 +1409,7 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len, { struct writequeue_entry *e; - spin_lock(&con->writequeue_lock); + spin_lock_bh(&con->writequeue_lock); if (!list_empty(&con->writequeue)) { e = list_last_entry(&con->writequeue, struct writequeue_entry, list); if (DLM_WQ_REMAIN_BYTES(e) >= len) { @@ -1421,12 +1421,12 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len, e->end += len; e->users++; - spin_unlock(&con->writequeue_lock); + spin_unlock_bh(&con->writequeue_lock); return e; } } - spin_unlock(&con->writequeue_lock); + spin_unlock_bh(&con->writequeue_lock); e = new_writequeue_entry(con, allocation); if (!e) @@ -1436,35 +1436,24 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len, *ppc = page_address(e->page); e->end += len; - spin_lock(&con->writequeue_lock); + spin_lock_bh(&con->writequeue_lock); if (cb) cb(*ppc, priv); list_add_tail(&e->list, &con->writequeue); - spin_unlock(&con->writequeue_lock); + spin_unlock_bh(&con->writequeue_lock); return e; }; -void *dlm_lowcomms_new_buffer(int nodeid, int len, gfp_t allocation, char **ppc, - void (*cb)(void *buf, void *priv), void *priv) +static void *dlm_lowcomms_new_buffer_con(struct connection *con, int len, + gfp_t allocation, char **ppc, + void (*cb)(void *buf, void *priv), + void *priv) { struct writequeue_entry *e; - struct connection *con; struct dlm_msg *msg; - if (len > DEFAULT_BUFFER_SIZE || - len < sizeof(struct dlm_header)) { - BUILD_BUG_ON(PAGE_SIZE < DEFAULT_BUFFER_SIZE); - log_print("failed to allocate a buffer of size %d", len); - WARN_ON(1); - return NULL; - } - - con = nodeid2con(nodeid, allocation); - if (!con) - return NULL; - msg = kzalloc(sizeof(*msg), allocation); if (!msg) return NULL; @@ -1484,6 +1473,26 @@ void *dlm_lowcomms_new_buffer(int nodeid, int len, gfp_t allocation, char **ppc, return msg; } +void *dlm_lowcomms_new_buffer(int nodeid, int len, gfp_t allocation, char **ppc, + void (*cb)(void *buf, void *priv), void *priv) +{ + struct connection *con; + + if (len > DEFAULT_BUFFER_SIZE || + len < sizeof(struct dlm_header)) { + BUILD_BUG_ON(PAGE_SIZE < DEFAULT_BUFFER_SIZE); + log_print("failed to allocate a buffer of size %d", len); + WARN_ON(1); + return NULL; + } + + con = nodeid2con(nodeid, allocation); + if (!con) + return NULL; + + return dlm_lowcomms_new_buffer_con(con, len, GFP_ATOMIC, ppc, cb, priv); +} + void dlm_lowcomms_commit_buffer(void *mh) { struct dlm_msg *msg = mh; @@ -1491,7 +1500,7 @@ void dlm_lowcomms_commit_buffer(void *mh) struct connection *con = e->con; int users; - spin_lock(&con->writequeue_lock); + spin_lock_bh(&con->writequeue_lock); list_add(&msg->list, &e->msgs); kref_get(&msg->ref); @@ -1500,13 +1509,13 @@ void dlm_lowcomms_commit_buffer(void *mh) goto out; e->len = DLM_WQ_LENGTH_BYTES(e); - spin_unlock(&con->writequeue_lock); + spin_unlock_bh(&con->writequeue_lock); queue_work(send_workqueue, &con->swork); return; out: - spin_unlock(&con->writequeue_lock); + spin_unlock_bh(&con->writequeue_lock); return; } @@ -1524,6 +1533,23 @@ void dlm_lowcomms_get_buffer(void *mh) kref_get(&msg->ref); } +/* irqsafe */ +void dlm_lowcomms_resend_buffer(void *mh) +{ + struct dlm_msg *msg = mh; + void *mh_new; + char *ppc; + + mh_new = dlm_lowcomms_new_buffer_con(msg->entry->con, msg->len, GFP_ATOMIC, + &ppc, NULL, NULL); + if (!mh_new) + return; + + memcpy(ppc, msg->ppc, msg->len); + dlm_lowcomms_commit_buffer(mh_new); + dlm_lowcomms_put_buffer(mh_new); +} + /* Send a message */ static void send_to_sock(struct connection *con) { @@ -1537,7 +1563,7 @@ static void send_to_sock(struct connection *con) if (con->sock == NULL) goto out_connect; - spin_lock(&con->writequeue_lock); + spin_lock_bh(&con->writequeue_lock); for (;;) { if (list_empty(&con->writequeue)) break; @@ -1546,7 +1572,7 @@ static void send_to_sock(struct connection *con) len = e->len; offset = e->offset; BUG_ON(len == 0 && e->users == 0); - spin_unlock(&con->writequeue_lock); + spin_unlock_bh(&con->writequeue_lock); ret = 0; if (len) { @@ -1574,10 +1600,10 @@ static void send_to_sock(struct connection *con) count = 0; } - spin_lock(&con->writequeue_lock); + spin_lock_bh(&con->writequeue_lock); writequeue_entry_complete(e, ret); } - spin_unlock(&con->writequeue_lock); + spin_unlock_bh(&con->writequeue_lock); out: mutex_unlock(&con->sock_mutex); return; @@ -1600,11 +1626,11 @@ static void clean_one_writequeue(struct connection *con) { struct writequeue_entry *e, *safe; - spin_lock(&con->writequeue_lock); + spin_lock_bh(&con->writequeue_lock); list_for_each_entry_safe(e, safe, &con->writequeue, list) { free_entry(e); } - spin_unlock(&con->writequeue_lock); + spin_unlock_bh(&con->writequeue_lock); } /* Called from recovery when it knows that a node has diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h index fa735497dad8..345aed7e00cc 100644 --- a/fs/dlm/lowcomms.h +++ b/fs/dlm/lowcomms.h @@ -30,6 +30,7 @@ int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark); int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len); void dlm_lowcomms_put_buffer(void *mh); void dlm_lowcomms_get_buffer(void *mh); +void dlm_lowcomms_resend_buffer(void *mh); #endif /* __LOWCOMMS_DOT_H__ */ -- 2.26.3