This patch change the dlm message parsing context from a workqueue to
a softirq context. This will hopefully speed up our dlm message
processing by removing a bunch of implicit scheduling points such a
cond_reched() depends on the preemption model setting. A softirq
(except PREEMPT_RT) can only be interrupted by other softirqs or
higher prio context such as hardware interrupts.

This patch will only move the dlm message parsing to the right context,
there exists more ideas to improve message parsing like using lockless
locking when doing read access on datastructures or enable a parallel
per node message processing. Further patches will improve those
behaviors. For now this patch will reduce the amount of interruptions
when doing DLM message parsing.

Signed-off-by: Alexander Aring <aahri...@redhat.com>
---
 fs/dlm/lowcomms.c | 34 ++++++++++------------------------
 1 file changed, 10 insertions(+), 24 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 28dd74aebc84..93f7e8827201 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -183,7 +183,6 @@ static int dlm_local_count;
 
 /* Work queues */
 static struct workqueue_struct *io_workqueue;
-static struct workqueue_struct *process_workqueue;
 
 static struct hlist_head connection_hash[CONN_HASH_SIZE];
 static DEFINE_SPINLOCK(connections_lock);
@@ -199,9 +198,9 @@ static const struct dlm_proto_ops *dlm_proto_ops;
 
 static void process_recv_sockets(struct work_struct *work);
 static void process_send_sockets(struct work_struct *work);
-static void process_dlm_messages(struct work_struct *work);
+static void process_dlm_messages(struct tasklet_struct *tasklet);
 
-static DECLARE_WORK(process_work, process_dlm_messages);
+static DECLARE_TASKLET(process_tasklet, process_dlm_messages);
 static DEFINE_SPINLOCK(processqueue_lock);
 static bool process_dlm_messages_pending;
 static atomic_t processqueue_count;
@@ -863,7 +862,7 @@ struct dlm_processed_nodes {
        struct list_head list;
 };
 
-static void process_dlm_messages(struct work_struct *work)
+static void process_dlm_messages(struct tasklet_struct *tasklet)
 {
        struct processqueue_entry *pentry;
 
@@ -971,7 +970,7 @@ static int receive_from_sock(struct connection *con, int 
buflen)
        list_add_tail(&pentry->list, &processqueue);
        if (!process_dlm_messages_pending) {
                process_dlm_messages_pending = true;
-               queue_work(process_workqueue, &process_work);
+               tasklet_schedule(&process_tasklet);
        }
        spin_unlock_bh(&processqueue_lock);
 
@@ -1511,7 +1510,8 @@ static void process_recv_sockets(struct work_struct *work)
                /* CF_RECV_PENDING cleared */
                break;
        case DLM_IO_FLUSH:
-               flush_workqueue(process_workqueue);
+               tasklet_disable(&process_tasklet);
+               tasklet_enable(&process_tasklet);
                fallthrough;
        case DLM_IO_RESCHED:
                cond_resched();
@@ -1685,11 +1685,6 @@ static void work_stop(void)
                destroy_workqueue(io_workqueue);
                io_workqueue = NULL;
        }
-
-       if (process_workqueue) {
-               destroy_workqueue(process_workqueue);
-               process_workqueue = NULL;
-       }
 }
 
 static int work_start(void)
@@ -1701,18 +1696,6 @@ static int work_start(void)
                return -ENOMEM;
        }
 
-       /* ordered dlm message process queue,
-        * should be converted to a tasklet
-        */
-       process_workqueue = alloc_ordered_workqueue("dlm_process",
-                                                   WQ_HIGHPRI | 
WQ_MEM_RECLAIM);
-       if (!process_workqueue) {
-               log_print("can't start dlm_process");
-               destroy_workqueue(io_workqueue);
-               io_workqueue = NULL;
-               return -ENOMEM;
-       }
-
        return 0;
 }
 
@@ -1734,7 +1717,10 @@ void dlm_lowcomms_shutdown(void)
                hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
                        shutdown_connection(con, true);
                        stop_connection_io(con);
-                       flush_workqueue(process_workqueue);
+
+                       tasklet_disable(&process_tasklet);
+                       tasklet_enable(&process_tasklet);
+
                        close_connection(con, true);
 
                        clean_one_writequeue(con);
-- 
2.31.1

Reply via email to