Each time dlm_add_cb() queues work or adds the lkb for queuing later to
the ls->ls_cb_delay list it increments a refcount. However if the work
is already queued or being added to the list we need to revert the
incrementation of the refcount. The function dlm_add_cb() can be called
multiple times without handling the related dlm_callback_work() work
function where it's get a put call. This patch reverts the kref_get()
when it's necessary in cases if already queued or not.

In case of dlm_callback_resume() we need to ensure that the
LSFL_CB_DELAY bit is cleared after all ls->ls_cb_delay lkbs are queued for
work. As the ls->ls_cb_delay list handling is there for queuing work for
later it should not be the case that a work was already queued, if so we
drop a warning.

Cc: sta...@vger.kernel.org
Signed-off-by: Alexander Aring <aahri...@redhat.com>
---
 fs/dlm/ast.c | 21 +++++++++++++++------
 1 file changed, 15 insertions(+), 6 deletions(-)

diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 0271796d36b1..68e09ed8234e 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -177,6 +177,7 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int 
mode, int status,
 {
        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
        uint64_t new_seq, prev_seq;
+       bool queued = true;
        int rv;
 
        spin_lock(&dlm_cb_seq_spin);
@@ -202,13 +203,19 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int 
mode, int status,
 
                mutex_lock(&ls->ls_cb_mutex);
                if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) {
-                       if (list_empty(&lkb->lkb_cb_list))
+                       if (list_empty(&lkb->lkb_cb_list)) {
                                list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay);
+                               queued = false;
+                       }
                } else {
-                       queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
+                       queued = !queue_work(ls->ls_callback_wq, 
&lkb->lkb_cb_work);
                }
                mutex_unlock(&ls->ls_cb_mutex);
+
+               if (queued)
+                       dlm_put_lkb(lkb);
        }
+
  out:
        mutex_unlock(&lkb->lkb_cb_mutex);
 }
@@ -303,9 +310,7 @@ void dlm_callback_resume(struct dlm_ls *ls)
 {
        struct dlm_lkb *lkb, *safe;
        int count = 0, sum = 0;
-       bool empty;
-
-       clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
+       bool empty, queued;
 
        if (!ls->ls_callback_wq)
                return;
@@ -314,12 +319,16 @@ void dlm_callback_resume(struct dlm_ls *ls)
        mutex_lock(&ls->ls_cb_mutex);
        list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) {
                list_del_init(&lkb->lkb_cb_list);
-               queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
+               queued = queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
+               WARN_ON_ONCE(!queued);
+
                count++;
                if (count == MAX_CB_QUEUE)
                        break;
        }
        empty = list_empty(&ls->ls_cb_delay);
+       if (empty)
+               clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
        mutex_unlock(&ls->ls_cb_mutex);
 
        sum += count;
-- 
2.31.1

Reply via email to