This patch will do a retry on -EBUSY dlm API return value when it's not a
dlm_unlock() call with either DLM_LKF_CANCEL or DLM_LKF_FORCEUNLOCK. All
other API calls can occur a -EBUSY return value and the DLM user needs
to handle it as a retry again for now.

The reason why we need a -EBUSY in all other cases is because
dlm_recover_waiters_post() function in DLM. This function is happening
when dlm recovery recovers lock states. In some cases it will trigger
new lock requests as:

if (oc || ou) {
        ...
} else {
        switch (mstype) {
        case DLM_MSG_LOOKUP:
        case DLM_MSG_REQUEST:
                _request_lock(r, lkb);
                if (is_master(r))
                        confirm_master(r, 0);
                break;
        case DLM_MSG_CONVERT:
                _convert_lock(r, lkb);
                break;
        default:
                err = 1;
        }
}

The problem begins on what dlm recovery is doing afterwards. Those
requests are not synchronized and there could be pending messages around.
There exists a race between handling those messages, dlm unlocks dlm API
for accepting new dlm requests and the dlm user triggers new requests
immediate afterwards. If the DLM user triggers new requests it can clash
with the above requests triggered by DLM internally because the pending
messages are still around. This behaviour is unpredictable and the user
has only knowledge about it if new requests returning -EBUSY. For now we
need to add retry cases everywhere to retry lock requests if the above
race happens.

Signed-off-by: Alexander Aring <aahri...@redhat.com>
---

changes since v2:
 - fix fs/dlm to fs/gfs2 - sorry about that

 fs/gfs2/lock_dlm.c | 18 ++++++++++++++++--
 1 file changed, 16 insertions(+), 2 deletions(-)

diff --git a/fs/gfs2/lock_dlm.c b/fs/gfs2/lock_dlm.c
index 2559a79cf14b..5c2eba142e9e 100644
--- a/fs/gfs2/lock_dlm.c
+++ b/fs/gfs2/lock_dlm.c
@@ -519,8 +519,15 @@ static int sync_unlock(struct gfs2_sbd *sdp, struct 
dlm_lksb *lksb, char *name)
        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
        int error;
 
+again:
        error = dlm_unlock(ls->ls_dlm, lksb->sb_lkid, 0, lksb, ls);
-       if (error) {
+       switch (error) {
+       case 0:
+               break;
+       case -EBUSY:
+               msleep(20);
+               goto again;
+       default:
                fs_err(sdp, "%s lkid %x error %d\n",
                       name, lksb->sb_lkid, error);
                return error;
@@ -546,10 +553,17 @@ static int sync_lock(struct gfs2_sbd *sdp, int mode, 
uint32_t flags,
        memset(strname, 0, GDLM_STRNAME_BYTES);
        snprintf(strname, GDLM_STRNAME_BYTES, "%8x%16x", LM_TYPE_NONDISK, num);
 
+again:
        error = dlm_lock(ls->ls_dlm, mode, lksb, flags,
                         strname, GDLM_STRNAME_BYTES - 1,
                         0, sync_wait_cb, ls, NULL);
-       if (error) {
+       switch (error) {
+       case 0:
+               break;
+       case -EBUSY:
+               msleep(20);
+               goto again;
+       default:
                fs_err(sdp, "%s lkid %x flags %x mode %d error %d\n",
                       name, lksb->sb_lkid, flags, mode, error);
                return error;
-- 
2.31.1

Reply via email to