When a node withdraws from a file system, it often leaves its journal
in an incomplete state. This is especially true when the withdraw is
caused by io errors writing to the journal. Before this patch, a
withdraw would try to write a "shutdown" record to the journal, tell
dlm it's done with the file system, and none of the other nodes
know about the problem. Later, when the problem is fixed and the
withdrawn node is rebooted, it would then discover that its own
journal was incomplete, and replay it. However, replaying it at this
point is almost guaranteed to introduce corruption because the other
nodes are likely to have used affected resource groups that appeared
in the journal since the time of the withdraw. Replaying the journal
later will overwrite any changes made, and not through any fault of
dlm, which was instructed during the withdraw to release those
resources.

This patch makes file system withdraws seen by the entire cluster.
Withdrawing nodes dequeue their journal glock to allow recovery.

The remaining nodes check all the journals to see if they are
clean or in need of replay. They try to replay dirty journals, but
only the journals of withdrawn nodes will be "not busy" and
therefore available for replay.

Until the journal replay is complete, no i/o related glocks may be
given out, to ensure that the replay does not cause the
aforementioned corruption: We cannot allow any journal replay to
overwrite blocks associated with a glock once it is held. The
glocks not affected by a withdraw are permitted to be passed
around as normal during a withdraw. A new glops flag, called
GLOF_OK_AT_WITHDRAW, indicates glocks that may be passed around
freely while a withdraw is taking place.

One such glock is the "live" glock which is now used to signal when
a withdraw occurs. When a withdraw occurs, the node signals its
withdraw by dequeueing the "live" glock and trying to enqueue it
in EX mode, thus forcing the other nodes to all see a demote
request, by way of a "1CB" (one callback) try lock. The "live"
glock is not granted in EX; the callback is only just used to
indicate a withdraw has occurred.

Note that all nodes in the cluster must wait for the recovering
node to finish replaying the withdrawing node's journal before
continuing. To this end, it checks that the journals are clean
multiple times in a retry loop.

Signed-off-by: Bob Peterson <rpete...@redhat.com>
---
 fs/gfs2/glock.c      |  35 ++++++++--
 fs/gfs2/glock.h      |   1 +
 fs/gfs2/glops.c      |  61 +++++++++++++++++-
 fs/gfs2/incore.h     |   6 ++
 fs/gfs2/lock_dlm.c   |  32 ++++++++++
 fs/gfs2/log.c        |  22 +++++--
 fs/gfs2/meta_io.c    |   2 +-
 fs/gfs2/ops_fstype.c |  48 ++------------
 fs/gfs2/super.c      |  24 ++++---
 fs/gfs2/super.h      |   1 +
 fs/gfs2/util.c       | 148 ++++++++++++++++++++++++++++++++++++++++++-
 fs/gfs2/util.h       |   3 +
 12 files changed, 315 insertions(+), 68 deletions(-)

diff --git a/fs/gfs2/glock.c b/fs/gfs2/glock.c
index c6d6e478f5e3..20fb6cdf7829 100644
--- a/fs/gfs2/glock.c
+++ b/fs/gfs2/glock.c
@@ -242,7 +242,8 @@ static void __gfs2_glock_put(struct gfs2_glock *gl)
        gfs2_glock_remove_from_lru(gl);
        spin_unlock(&gl->gl_lockref.lock);
        GLOCK_BUG_ON(gl, !list_empty(&gl->gl_holders));
-       GLOCK_BUG_ON(gl, mapping && mapping->nrpages);
+       GLOCK_BUG_ON(gl, mapping && mapping->nrpages &&
+                    !test_bit(SDF_SHUTDOWN, &sdp->sd_flags));
        trace_gfs2_glock_put(gl);
        sdp->sd_lockstruct.ls_ops->lm_put_lock(gl);
 }
@@ -543,6 +544,8 @@ __acquires(&gl->gl_lockref.lock)
        int ret;
 
        if (unlikely(withdrawn(sdp)) &&
+           !(glops->go_flags & GLOF_OK_AT_WITHDRAW) &&
+           (gh && !(LM_FLAG_NOEXP & gh->gh_flags)) &&
            target != LM_ST_UNLOCKED)
                return;
        lck_flags &= (LM_FLAG_TRY | LM_FLAG_TRY_1CB | LM_FLAG_NOEXP |
@@ -561,9 +564,10 @@ __acquires(&gl->gl_lockref.lock)
            (lck_flags & (LM_FLAG_TRY|LM_FLAG_TRY_1CB)))
                clear_bit(GLF_BLOCKING, &gl->gl_flags);
        spin_unlock(&gl->gl_lockref.lock);
-       if (glops->go_sync)
+       if (glops->go_sync && !test_bit(SDF_SHUTDOWN, &sdp->sd_flags))
                glops->go_sync(gl);
-       if (test_bit(GLF_INVALIDATE_IN_PROGRESS, &gl->gl_flags))
+       if (test_bit(GLF_INVALIDATE_IN_PROGRESS, &gl->gl_flags) &&
+           !test_bit(SDF_SHUTDOWN, &sdp->sd_flags))
                glops->go_inval(gl, target == LM_ST_DEFERRED ? 0 : 
DIO_METADATA);
        clear_bit(GLF_INVALIDATE_IN_PROGRESS, &gl->gl_flags);
 
@@ -1091,7 +1095,8 @@ int gfs2_glock_nq(struct gfs2_holder *gh)
        struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
        int error = 0;
 
-       if (unlikely(withdrawn(sdp)))
+       if (unlikely(withdrawn(sdp) && !(LM_FLAG_NOEXP & gh->gh_flags) &&
+                    !(gl->gl_ops->go_flags & GLOF_OK_AT_WITHDRAW)))
                return -EIO;
 
        if (test_bit(GLF_LRU, &gl->gl_flags))
@@ -1135,11 +1140,28 @@ int gfs2_glock_poll(struct gfs2_holder *gh)
 void gfs2_glock_dq(struct gfs2_holder *gh)
 {
        struct gfs2_glock *gl = gh->gh_gl;
+       struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
        const struct gfs2_glock_operations *glops = gl->gl_ops;
        unsigned delay = 0;
        int fast_path = 0;
 
        spin_lock(&gl->gl_lockref.lock);
+       /**
+        * If we're in the process of file system withdraw, we cannot just
+        * dequeue any glocks until our journal is recovered, lest we
+        * introduce file system corruption. We need to exceptions to this
+        * rule: (1) We need to allow unlocking of nondisk glocks and the
+        * glock for our own journal that needs recovery.
+        */
+       if (test_bit(SDF_SHUTDOWN, &sdp->sd_flags) &&
+           test_bit(SDF_WITHDRAW_RECOVERY, &sdp->sd_flags) &&
+           !(gl->gl_ops->go_flags & GLOF_OK_AT_WITHDRAW) &&
+           gh != &sdp->sd_jinode_gh) {
+               sdp->sd_glock_dqs_held++;
+               might_sleep();
+               wait_on_bit(&sdp->sd_flags, SDF_WITHDRAW_RECOVERY,
+                           TASK_UNINTERRUPTIBLE);
+       }
        if (gh->gh_flags & GL_NOCACHE)
                handle_callback(gl, LM_ST_UNLOCKED, 0, false);
 
@@ -1619,6 +1641,11 @@ static void dump_glock_func(struct gfs2_glock *gl)
        dump_glock(NULL, gl);
 }
 
+void gfs2_gl_flushwork(struct gfs2_sbd *sdp)
+{
+       flush_workqueue(glock_workqueue);
+}
+
 /**
  * gfs2_gl_hash_clear - Empty out the glock hash table
  * @sdp: the filesystem
diff --git a/fs/gfs2/glock.h b/fs/gfs2/glock.h
index 936b3295839c..c1c40e2dbd96 100644
--- a/fs/gfs2/glock.h
+++ b/fs/gfs2/glock.h
@@ -202,6 +202,7 @@ extern int gfs2_glock_nq_num(struct gfs2_sbd *sdp, u64 
number,
                             struct gfs2_holder *gh);
 extern int gfs2_glock_nq_m(unsigned int num_gh, struct gfs2_holder *ghs);
 extern void gfs2_glock_dq_m(unsigned int num_gh, struct gfs2_holder *ghs);
+extern void gfs2_gl_flushwork(struct gfs2_sbd *sdp);
 extern void gfs2_dump_glock(struct seq_file *seq, struct gfs2_glock *gl);
 #define GLOCK_BUG_ON(gl,x) do { if (unlikely(x)) { gfs2_dump_glock(NULL, gl); 
BUG(); } } while(0)
 extern __printf(2, 3)
diff --git a/fs/gfs2/glops.c b/fs/gfs2/glops.c
index 4b0e52bf5825..f372a6f169a2 100644
--- a/fs/gfs2/glops.c
+++ b/fs/gfs2/glops.c
@@ -32,6 +32,8 @@
 
 struct workqueue_struct *gfs2_freeze_wq;
 
+extern struct workqueue_struct *gfs2_control_wq;
+
 static void gfs2_ail_error(struct gfs2_glock *gl, const struct buffer_head *bh)
 {
        fs_err(gl->gl_name.ln_sbd,
@@ -396,6 +398,7 @@ static int gfs2_dinode_in(struct gfs2_inode *ip, const void 
*buf)
        return 0;
 corrupt:
        gfs2_consist_inode(ip);
+       printk("gah2");
        return -EIO;
 }
 
@@ -584,8 +587,58 @@ static void iopen_go_callback(struct gfs2_glock *gl, bool 
remote)
        }
 }
 
+/**
+ * nondisk_go_callback - used to signal when a node did a withdraw
+ * @gl: the nondisk glock
+ * @remote: true if this came from a different cluster node
+ *
+ */
+static void nondisk_go_callback(struct gfs2_glock *gl, bool remote)
+{
+       struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
+
+       /* Ignore the callback unless it's from another node, and it's the
+          live lock. */
+       if (!remote || gl->gl_name.ln_number != GFS2_LIVE_LOCK)
+               return;
+
+       /* First order of business is to cancel the demote request. We don't
+        * really want to demote a nondisk glock. At best it's just to inform
+        * us of a another node's withdraw. We'll keep it in SH mode. */
+       clear_bit(GLF_DEMOTE, &gl->gl_flags);
+       clear_bit(GLF_PENDING_DEMOTE, &gl->gl_flags);
+
+       /* Ignore the unlock if we're withdrawn, unmounting, or in recovery. */
+       if (test_bit(SDF_NORECOVERY, &sdp->sd_flags) ||
+           test_bit(SDF_SHUTDOWN, &sdp->sd_flags) ||
+           test_bit(SDF_REMOTE_WITHDRAW, &sdp->sd_flags))
+               return;
+
+       /* We only care when a node wants us to unlock, because that means
+        * they want a journal recovered. */
+       if (gl->gl_demote_state != LM_ST_UNLOCKED)
+               return;
+
+       if (sdp->sd_args.ar_spectator) {
+               fs_warn(sdp, "Spectator node cannot recover journals.\n");
+               return;
+       }
+
+       fs_warn(sdp, "Some node has withdrawn; checking for recovery.\n");
+       set_bit(SDF_REMOTE_WITHDRAW, &sdp->sd_flags);
+       /**
+        * We can't call remote_withdraw directly here or gfs2_recover_journal
+        * because this is called from the glock unlock function and the
+        * remote_withdraw needs to enqueue and dequeue the same "live" glock
+        * we were called from. So we queue it to the control work queue in
+        * lock_dlm.
+        */
+       queue_delayed_work(gfs2_control_wq, &sdp->sd_control_work, 0);
+}
+
 const struct gfs2_glock_operations gfs2_meta_glops = {
        .go_type = LM_TYPE_META,
+       .go_flags = GLOF_OK_AT_WITHDRAW,
 };
 
 const struct gfs2_glock_operations gfs2_inode_glops = {
@@ -613,6 +666,7 @@ const struct gfs2_glock_operations gfs2_freeze_glops = {
        .go_xmote_bh = freeze_go_xmote_bh,
        .go_demote_ok = freeze_go_demote_ok,
        .go_type = LM_TYPE_NONDISK,
+       .go_flags = GLOF_OK_AT_WITHDRAW,
 };
 
 const struct gfs2_glock_operations gfs2_iopen_glops = {
@@ -623,20 +677,23 @@ const struct gfs2_glock_operations gfs2_iopen_glops = {
 
 const struct gfs2_glock_operations gfs2_flock_glops = {
        .go_type = LM_TYPE_FLOCK,
-       .go_flags = GLOF_LRU,
+       .go_flags = GLOF_LRU | GLOF_OK_AT_WITHDRAW,
 };
 
 const struct gfs2_glock_operations gfs2_nondisk_glops = {
        .go_type = LM_TYPE_NONDISK,
+       .go_callback = nondisk_go_callback,
+       .go_flags = GLOF_OK_AT_WITHDRAW,
 };
 
 const struct gfs2_glock_operations gfs2_quota_glops = {
        .go_type = LM_TYPE_QUOTA,
-       .go_flags = GLOF_LVB | GLOF_LRU,
+       .go_flags = GLOF_LVB | GLOF_LRU | GLOF_OK_AT_WITHDRAW,
 };
 
 const struct gfs2_glock_operations gfs2_journal_glops = {
        .go_type = LM_TYPE_JOURNAL,
+       .go_flags = GLOF_OK_AT_WITHDRAW,
 };
 
 const struct gfs2_glock_operations *gfs2_glops_list[] = {
diff --git a/fs/gfs2/incore.h b/fs/gfs2/incore.h
index 8380d4db8be6..2ddae1326ce2 100644
--- a/fs/gfs2/incore.h
+++ b/fs/gfs2/incore.h
@@ -250,6 +250,7 @@ struct gfs2_glock_operations {
 #define GLOF_ASPACE 1
 #define GLOF_LVB    2
 #define GLOF_LRU    4
+#define GLOF_OK_AT_WITHDRAW 8
 };
 
 enum {
@@ -622,6 +623,9 @@ enum {
        SDF_FORCE_AIL_FLUSH     = 9,
        SDF_AIL1_IO_ERROR       = 10,
        SDF_PENDING_WITHDRAW    = 11, /* Will withdraw eventually */
+       SDF_REMOTE_WITHDRAW     = 12, /* Performing remote recovery */
+       SDF_WITHDRAW_RECOVERY   = 13, /* Wait for journal recovery when we are
+                                        withdrawing */
 };
 
 enum gfs2_freeze_state {
@@ -770,6 +774,7 @@ struct gfs2_sbd {
        struct gfs2_jdesc *sd_jdesc;
        struct gfs2_holder sd_journal_gh;
        struct gfs2_holder sd_jinode_gh;
+       struct gfs2_glock *sd_jinode_gl;
 
        struct gfs2_holder sd_sc_gh;
        struct gfs2_holder sd_qc_gh;
@@ -854,6 +859,7 @@ struct gfs2_sbd {
 
        unsigned long sd_last_warning;
        struct dentry *debugfs_dir;    /* debugfs directory */
+       unsigned long sd_glock_dqs_held;
 };
 
 static inline void gfs2_glstats_inc(struct gfs2_glock *gl, int which)
diff --git a/fs/gfs2/lock_dlm.c b/fs/gfs2/lock_dlm.c
index d2cb2fe1c3f3..619d7a0e8ac1 100644
--- a/fs/gfs2/lock_dlm.c
+++ b/fs/gfs2/lock_dlm.c
@@ -19,6 +19,8 @@
 
 #include "incore.h"
 #include "glock.h"
+#include "glops.h"
+#include "recovery.h"
 #include "util.h"
 #include "sys.h"
 #include "trace_gfs2.h"
@@ -325,6 +327,7 @@ static void gdlm_cancel(struct gfs2_glock *gl)
 /*
  * dlm/gfs2 recovery coordination using dlm_recover callbacks
  *
+ *  0. gfs2 checks for another cluster node withdraw, needing journal replay
  *  1. dlm_controld sees lockspace members change
  *  2. dlm_controld blocks dlm-kernel locking activity
  *  3. dlm_controld within dlm-kernel notifies gfs2 (recover_prep)
@@ -573,6 +576,28 @@ static int control_lock(struct gfs2_sbd *sdp, int mode, 
uint32_t flags)
                         &ls->ls_control_lksb, "control_lock");
 }
 
+/**
+ * remote_withdraw - react to a node withdrawing from the file system
+ * @sdp: The superblock
+ */
+static void remote_withdraw(struct gfs2_sbd *sdp)
+{
+       struct gfs2_jdesc *jd;
+       int ret = 0, count = 0;
+
+       list_for_each_entry(jd, &sdp->sd_jindex_list, jd_list) {
+               if (jd->jd_jid == sdp->sd_lockstruct.ls_jid)
+                       continue;
+               ret = gfs2_recover_journal(jd, true);
+               if (ret)
+                       break;
+               count++;
+       }
+
+       /* Now drop the additional reference we acquired */
+       fs_err(sdp, "Journals checked: %d, ret = %d.\n", count, ret);
+}
+
 static void gfs2_control_func(struct work_struct *work)
 {
        struct gfs2_sbd *sdp = container_of(work, struct gfs2_sbd, 
sd_control_work.work);
@@ -583,6 +608,13 @@ static void gfs2_control_func(struct work_struct *work)
        int recover_size;
        int i, error;
 
+       /* First check for other nodes that may have done a withdraw. */
+       if (test_bit(SDF_REMOTE_WITHDRAW, &sdp->sd_flags)) {
+               remote_withdraw(sdp);
+               clear_bit(SDF_REMOTE_WITHDRAW, &sdp->sd_flags);
+               return;
+       }
+
        spin_lock(&ls->ls_recover_spin);
        /*
         * No MOUNT_DONE means we're still mounting; control_mount()
diff --git a/fs/gfs2/log.c b/fs/gfs2/log.c
index ec8675113b0d..81550038ace3 100644
--- a/fs/gfs2/log.c
+++ b/fs/gfs2/log.c
@@ -107,7 +107,7 @@ __acquires(&sdp->sd_ail_lock)
                gfs2_assert(sdp, bd->bd_tr == tr);
 
                if (!buffer_busy(bh)) {
-                       if (!buffer_uptodate(bh) &&
+                       if (!buffer_uptodate(bh) && !withdrawn(sdp) &&
                            !test_and_set_bit(SDF_AIL1_IO_ERROR,
                                              &sdp->sd_flags)) {
                                gfs2_io_error_bh(sdp, bh);
@@ -205,7 +205,7 @@ static void gfs2_ail1_empty_one(struct gfs2_sbd *sdp, 
struct gfs2_trans *tr)
                gfs2_assert(sdp, bd->bd_tr == tr);
                if (buffer_busy(bh))
                        continue;
-               if (!buffer_uptodate(bh) &&
+               if (!buffer_uptodate(bh) && !withdrawn(sdp) &&
                    !test_and_set_bit(SDF_AIL1_IO_ERROR, &sdp->sd_flags)) {
                        gfs2_io_error_bh(sdp, bh);
                        set_bit(SDF_PENDING_WITHDRAW, &sdp->sd_flags);
@@ -747,6 +747,10 @@ static void log_write_header(struct gfs2_sbd *sdp, u32 
flags)
        int op_flags = REQ_PREFLUSH | REQ_FUA | REQ_META | REQ_SYNC;
        enum gfs2_freeze_state state = atomic_read(&sdp->sd_freeze_state);
 
+       if (test_bit(SDF_SHUTDOWN, &sdp->sd_flags)) {
+               log_flush_wait(sdp);
+               return;
+       }
        gfs2_assert_withdraw(sdp, (state != SFS_FROZEN));
        tail = current_tail(sdp);
 
@@ -776,6 +780,8 @@ void gfs2_log_flush(struct gfs2_sbd *sdp, struct gfs2_glock 
*gl, u32 flags)
        struct gfs2_trans *tr;
        enum gfs2_freeze_state state = atomic_read(&sdp->sd_freeze_state);
 
+       if (!test_bit(SDF_JOURNAL_LIVE, &sdp->sd_flags))
+               return;
        down_write(&sdp->sd_log_flush_lock);
 
        /* Log might have been flushed while we waited for the flush lock */
@@ -1003,8 +1009,10 @@ int gfs2_logd(void *data)
                did_flush = false;
                if (gfs2_jrnl_flush_reqd(sdp) || t == 0) {
                        gfs2_ail1_empty(sdp);
-                       gfs2_log_flush(sdp, NULL, GFS2_LOG_HEAD_FLUSH_NORMAL |
-                                      GFS2_LFC_LOGD_JFLUSH_REQD);
+                       if (test_bit(SDF_JOURNAL_LIVE, &sdp->sd_flags))
+                               gfs2_log_flush(sdp, NULL,
+                                              GFS2_LOG_HEAD_FLUSH_NORMAL |
+                                              GFS2_LFC_LOGD_JFLUSH_REQD);
                        did_flush = true;
                }
 
@@ -1012,8 +1020,10 @@ int gfs2_logd(void *data)
                        gfs2_ail1_start(sdp);
                        gfs2_ail1_wait(sdp);
                        gfs2_ail1_empty(sdp);
-                       gfs2_log_flush(sdp, NULL, GFS2_LOG_HEAD_FLUSH_NORMAL |
-                                      GFS2_LFC_LOGD_AIL_FLUSH_REQD);
+                       if (test_bit(SDF_JOURNAL_LIVE, &sdp->sd_flags))
+                               gfs2_log_flush(sdp, NULL,
+                                              GFS2_LOG_HEAD_FLUSH_NORMAL |
+                                              GFS2_LFC_LOGD_AIL_FLUSH_REQD);
                        did_flush = true;
                }
 
diff --git a/fs/gfs2/meta_io.c b/fs/gfs2/meta_io.c
index 97c161782763..39a6cc84a908 100644
--- a/fs/gfs2/meta_io.c
+++ b/fs/gfs2/meta_io.c
@@ -254,7 +254,7 @@ int gfs2_meta_read(struct gfs2_glock *gl, u64 blkno, int 
flags,
        struct buffer_head *bh, *bhs[2];
        int num = 0;
 
-       if (unlikely(withdrawn(sdp))) {
+       if (unlikely(withdrawn(sdp)) && gl != sdp->sd_jinode_gl) {
                *bhp = NULL;
                return -EIO;
        }
diff --git a/fs/gfs2/ops_fstype.c b/fs/gfs2/ops_fstype.c
index 402201978312..650e841f2e44 100644
--- a/fs/gfs2/ops_fstype.c
+++ b/fs/gfs2/ops_fstype.c
@@ -591,48 +591,6 @@ static int gfs2_jindex_hold(struct gfs2_sbd *sdp, struct 
gfs2_holder *ji_gh)
        return error;
 }
 
-/**
- * check_journal_clean - Make sure a journal is clean for a spectator mount
- * @sdp: The GFS2 superblock
- * @jd: The journal descriptor
- *
- * Returns: 0 if the journal is clean or locked, else an error
- */
-static int check_journal_clean(struct gfs2_sbd *sdp, struct gfs2_jdesc *jd)
-{
-       int error;
-       struct gfs2_holder j_gh;
-       struct gfs2_log_header_host head;
-       struct gfs2_inode *ip;
-
-       ip = GFS2_I(jd->jd_inode);
-       error = gfs2_glock_nq_init(ip->i_gl, LM_ST_SHARED, LM_FLAG_NOEXP |
-                                  GL_EXACT | GL_NOCACHE, &j_gh);
-       if (error) {
-               fs_err(sdp, "Error locking journal for spectator mount.\n");
-               return -EPERM;
-       }
-       error = gfs2_jdesc_check(jd);
-       if (error) {
-               fs_err(sdp, "Error checking journal for spectator mount.\n");
-               goto out_unlock;
-       }
-       error = gfs2_find_jhead(jd, &head);
-       if (error) {
-               fs_err(sdp, "Error parsing journal for spectator mount.\n");
-               goto out_unlock;
-       }
-       if (!(head.lh_flags & GFS2_LOG_HEAD_UNMOUNT)) {
-               error = -EPERM;
-               fs_err(sdp, "jid=%u: Journal is dirty, so the first mounter "
-                      "must not be a spectator.\n", jd->jd_jid);
-       }
-
-out_unlock:
-       gfs2_glock_dq_uninit(&j_gh);
-       return error;
-}
-
 static int init_journal(struct gfs2_sbd *sdp, int undo)
 {
        struct inode *master = d_inode(sdp->sd_master_dir);
@@ -685,7 +643,8 @@ static int init_journal(struct gfs2_sbd *sdp, int undo)
 
                error = gfs2_glock_nq_num(sdp, sdp->sd_lockstruct.ls_jid,
                                          &gfs2_journal_glops,
-                                         LM_ST_EXCLUSIVE, LM_FLAG_NOEXP,
+                                         LM_ST_EXCLUSIVE,
+                                         LM_FLAG_NOEXP | GL_NOCACHE,
                                          &sdp->sd_journal_gh);
                if (error) {
                        fs_err(sdp, "can't acquire journal glock: %d\n", error);
@@ -693,6 +652,7 @@ static int init_journal(struct gfs2_sbd *sdp, int undo)
                }
 
                ip = GFS2_I(sdp->sd_jdesc->jd_inode);
+               sdp->sd_jinode_gl = ip->i_gl;
                error = gfs2_glock_nq_init(ip->i_gl, LM_ST_SHARED,
                                           LM_FLAG_NOEXP | GL_EXACT | 
GL_NOCACHE,
                                           &sdp->sd_jinode_gh);
@@ -723,7 +683,7 @@ static int init_journal(struct gfs2_sbd *sdp, int undo)
                        struct gfs2_jdesc *jd = gfs2_jdesc_find(sdp, x);
 
                        if (sdp->sd_args.ar_spectator) {
-                               error = check_journal_clean(sdp, jd);
+                               error = check_journal_clean(sdp, jd, true);
                                if (error)
                                        goto fail_jinode_gh;
                                continue;
diff --git a/fs/gfs2/super.c b/fs/gfs2/super.c
index 8033f24e0ad0..ebb11165a1b1 100644
--- a/fs/gfs2/super.c
+++ b/fs/gfs2/super.c
@@ -841,11 +841,12 @@ static void gfs2_dirty_inode(struct inode *inode, int 
flags)
 /**
  * gfs2_make_fs_ro - Turn a Read-Write FS into a Read-Only one
  * @sdp: the filesystem
+ * @withdrawing: if 1, we're withdrawing so only do what's necessary
  *
  * Returns: errno
  */
 
-static int gfs2_make_fs_ro(struct gfs2_sbd *sdp)
+int gfs2_make_fs_ro(struct gfs2_sbd *sdp, int withdrawing)
 {
        struct gfs2_holder freeze_gh;
        int error;
@@ -859,11 +860,12 @@ static int gfs2_make_fs_ro(struct gfs2_sbd *sdp)
        kthread_stop(sdp->sd_quotad_process);
        kthread_stop(sdp->sd_logd_process);
 
-       gfs2_quota_sync(sdp->sd_vfs, 0);
-       gfs2_statfs_sync(sdp->sd_vfs, 0);
-
-       gfs2_log_flush(sdp, NULL, GFS2_LOG_HEAD_FLUSH_SHUTDOWN |
-                      GFS2_LFC_MAKE_FS_RO);
+       if (!withdrawing) {
+               gfs2_quota_sync(sdp->sd_vfs, 0);
+               gfs2_statfs_sync(sdp->sd_vfs, 0);
+               gfs2_log_flush(sdp, NULL, GFS2_LOG_HEAD_FLUSH_SHUTDOWN |
+                              GFS2_LFC_MAKE_FS_RO);
+       }
        wait_event(sdp->sd_reserving_log_wait, 
atomic_read(&sdp->sd_reserving_log) == 0);
        gfs2_assert_warn(sdp, atomic_read(&sdp->sd_log_blks_free) == 
sdp->sd_jdesc->jd_blocks);
 
@@ -905,7 +907,7 @@ static void gfs2_put_super(struct super_block *sb)
        spin_unlock(&sdp->sd_jindex_spin);
 
        if (!sb_rdonly(sb)) {
-               error = gfs2_make_fs_ro(sdp);
+               error = gfs2_make_fs_ro(sdp, 0);
                if (error)
                        gfs2_io_error(sdp);
        }
@@ -922,8 +924,10 @@ static void gfs2_put_super(struct super_block *sb)
        gfs2_glock_put(sdp->sd_freeze_gl);
 
        if (!sdp->sd_args.ar_spectator) {
-               gfs2_glock_dq_uninit(&sdp->sd_journal_gh);
-               gfs2_glock_dq_uninit(&sdp->sd_jinode_gh);
+               if (gfs2_holder_initialized(&sdp->sd_journal_gh))
+                       gfs2_glock_dq_uninit(&sdp->sd_journal_gh);
+               if (gfs2_holder_initialized(&sdp->sd_jinode_gh))
+                       gfs2_glock_dq_uninit(&sdp->sd_jinode_gh);
                gfs2_glock_dq_uninit(&sdp->sd_sc_gh);
                gfs2_glock_dq_uninit(&sdp->sd_qc_gh);
                iput(sdp->sd_sc_inode);
@@ -1271,7 +1275,7 @@ static int gfs2_remount_fs(struct super_block *sb, int 
*flags, char *data)
 
        if ((sb->s_flags ^ *flags) & SB_RDONLY) {
                if (*flags & SB_RDONLY)
-                       error = gfs2_make_fs_ro(sdp);
+                       error = gfs2_make_fs_ro(sdp, 0);
                else
                        error = gfs2_make_fs_rw(sdp);
                if (error)
diff --git a/fs/gfs2/super.h b/fs/gfs2/super.h
index 73c97dccae21..e859c6d5bb3e 100644
--- a/fs/gfs2/super.h
+++ b/fs/gfs2/super.h
@@ -45,6 +45,7 @@ extern void gfs2_statfs_change_in(struct 
gfs2_statfs_change_host *sc,
 extern void update_statfs(struct gfs2_sbd *sdp, struct buffer_head *m_bh,
                          struct buffer_head *l_bh);
 extern int gfs2_statfs_sync(struct super_block *sb, int type);
+extern int gfs2_make_fs_ro(struct gfs2_sbd *sdp, int withdrawing);
 extern void gfs2_freeze_func(struct work_struct *work);
 
 extern struct file_system_type gfs2_fs_type;
diff --git a/fs/gfs2/util.c b/fs/gfs2/util.c
index ca6de80b5e8b..75f67284bba8 100644
--- a/fs/gfs2/util.c
+++ b/fs/gfs2/util.c
@@ -14,12 +14,17 @@
 #include <linux/buffer_head.h>
 #include <linux/crc32.h>
 #include <linux/gfs2_ondisk.h>
+#include <linux/delay.h>
 #include <linux/uaccess.h>
 
 #include "gfs2.h"
 #include "incore.h"
 #include "glock.h"
+#include "log.h"
+#include "lops.h"
+#include "recovery.h"
 #include "rgrp.h"
+#include "super.h"
 #include "util.h"
 
 struct kmem_cache *gfs2_glock_cachep __read_mostly;
@@ -36,6 +41,145 @@ void gfs2_assert_i(struct gfs2_sbd *sdp)
        fs_emerg(sdp, "fatal assertion failed\n");
 }
 
+/**
+ * check_journal_clean - Make sure a journal is clean for a spectator mount
+ * @sdp: The GFS2 superblock
+ * @jd: The journal descriptor
+ *
+ * Returns: 0 if the journal is clean or locked, else an error
+ */
+int check_journal_clean(struct gfs2_sbd *sdp, struct gfs2_jdesc *jd,
+                       bool verbose)
+{
+       int error;
+       struct gfs2_log_header_host head;
+       struct gfs2_inode *ip;
+
+       ip = GFS2_I(jd->jd_inode);
+       error = gfs2_glock_nq_init(ip->i_gl, LM_ST_SHARED, LM_FLAG_NOEXP |
+                                  GL_EXACT | GL_NOCACHE, &sdp->sd_jinode_gh);
+       if (error) {
+               if (verbose)
+                       fs_err(sdp, "Error %d locking journal for spectator "
+                              "mount.\n", error);
+               return -EPERM;
+       }
+       error = gfs2_jdesc_check(jd);
+       if (error) {
+               if (verbose)
+                       fs_err(sdp, "Error checking journal for spectator "
+                              "mount.\n");
+               goto out_unlock;
+       }
+       error = gfs2_find_jhead(jd, &head);
+       if (error) {
+               if (verbose)
+                       fs_err(sdp, "Error parsing journal for spectator "
+                              "mount.\n");
+               goto out_unlock;
+       }
+       if (!(head.lh_flags & GFS2_LOG_HEAD_UNMOUNT)) {
+               error = -EPERM;
+               if (verbose)
+                       fs_err(sdp, "jid=%u: Journal is dirty, so the first "
+                              "mounter must not be a spectator.\n",
+                              jd->jd_jid);
+       }
+
+out_unlock:
+       gfs2_glock_dq_uninit(&sdp->sd_jinode_gh);
+       return error;
+}
+
+static void signal_our_withdraw(struct gfs2_sbd *sdp)
+{
+       struct gfs2_glock *gl = sdp->sd_live_gh.gh_gl;
+       int ret = 0;
+       int tries;
+
+       /* Prevent any glock dq until withdraw recovery is complete */
+       set_bit(SDF_WITHDRAW_RECOVERY, &sdp->sd_flags);
+       /**
+        * Don't tell dlm we're bailing until we have no more buffers in the
+        * wind. If journal had an IO error, the log code should just purge
+        * the outstanding buffers rather than submitting new IO. Making the
+        * file system read-only will flush the journal, etc.
+        *
+        * During a normal unmount, gfs2_make_fs_ro calls gfs2_log_shutdown
+        * which clears SDF_JOURNAL_LIVE. In a withdraw, we cannot write
+        * any UNMOUNT log header, so we can't call gfs2_log_shutdown, and
+        * therefore we need to clear SDF_JOURNAL_LIVE manually.
+        */
+       clear_bit(SDF_JOURNAL_LIVE, &sdp->sd_flags);
+       ret = gfs2_make_fs_ro(sdp, 1);
+       sdp->sd_vfs->s_flags |= SB_RDONLY;
+
+       /* Drop the glock for our journal so another node can recover it. */
+       gfs2_glock_dq_wait(&sdp->sd_journal_gh);
+       gfs2_holder_uninit(&sdp->sd_journal_gh);
+       sdp->sd_jinode_gh.gh_flags |= GL_NOCACHE;
+       gfs2_glock_dq_wait(&sdp->sd_jinode_gh);
+       /* holder_uninit to force glock_put, to force dlm to let go */
+       gfs2_holder_uninit(&sdp->sd_jinode_gh);
+       gfs2_jindex_free(sdp);
+       /* Flush the glock work so the glock is freed. This allows try locks
+        * on other nodes to be successful, otherwise we remain the owner of
+        * the glock until the workqueue gets around to running. */
+       gfs2_gl_flushwork(sdp);
+
+       if (sdp->sd_lockstruct.ls_ops->lm_lock == NULL) /* lock_nolock */
+               goto skip_recovery;
+       /**
+        * Dequeue the "live" glock, but keep a reference so it's never freed.
+        */
+       gfs2_glock_hold(gl);
+       gfs2_glock_dq_wait(&sdp->sd_live_gh);
+       /**
+        * We enqueue the "live" glock in EX so that all other nodes
+        * get a demote request and act on it, demoting their glock
+        * from SHARED to UNLOCKED. Once we have the glock in EX, we
+        * know all other nodes have been informed of our departure.
+        * They cannot do anything more until our journal has been
+        * replayed and our locks released.
+        */
+       fs_warn(sdp, "Requesting recovery of jid %d.\n",
+               sdp->sd_lockstruct.ls_jid);
+       gfs2_holder_reinit(LM_ST_EXCLUSIVE, LM_FLAG_TRY_1CB | LM_FLAG_NOEXP,
+                          &sdp->sd_live_gh);
+       msleep(GL_GLOCK_MAX_HOLD);
+       /* This will likely fail in a cluster, but succeed stand-alone: */
+       ret = gfs2_glock_nq(&sdp->sd_live_gh);
+       if (ret == 0) {
+               gfs2_glock_dq_wait(&sdp->sd_live_gh);
+               gfs2_holder_reinit(LM_ST_SHARED, LM_FLAG_NOEXP | GL_EXACT,
+                                  &sdp->sd_live_gh);
+               gfs2_glock_nq(&sdp->sd_live_gh);
+       }
+       /* Now drop the additional reference we acquired */
+       gfs2_glock_queue_put(gl);
+       clear_bit(SDF_WITHDRAW_RECOVERY, &sdp->sd_flags);
+
+       /* Now wait until recovery complete. */
+       for (tries = 0; tries < 10; tries++) {
+               ret = check_journal_clean(sdp, sdp->sd_jdesc, false);
+               if (!ret)
+                       break;
+               msleep(HZ);
+               fs_warn(sdp, "Waiting for journal recovery jid %d.\n",
+                       sdp->sd_lockstruct.ls_jid);
+       }
+skip_recovery:
+       if (!ret)
+               fs_warn(sdp, "Journal recovery complete for jid %d.\n",
+                       sdp->sd_lockstruct.ls_jid);
+       else
+               fs_warn(sdp, "Journal recovery skipped for %d until next "
+                       "mount.\n", sdp->sd_lockstruct.ls_jid);
+       fs_warn(sdp, "Glock dequeues delayed: %lu\n", sdp->sd_glock_dqs_held);
+       sdp->sd_glock_dqs_held = 0;
+       wake_up_bit(&sdp->sd_flags, SDF_WITHDRAW_RECOVERY);
+}
+
 int gfs2_lm_withdraw(struct gfs2_sbd *sdp, const char *fmt, ...)
 {
        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
@@ -63,6 +207,8 @@ int gfs2_lm_withdraw(struct gfs2_sbd *sdp, const char *fmt, 
...)
                fs_err(sdp, "about to withdraw this file system\n");
                BUG_ON(sdp->sd_args.ar_debug);
 
+               signal_our_withdraw(sdp);
+
                kobject_uevent(&sdp->sd_kobj, KOBJ_OFFLINE);
 
                if (!strcmp(sdp->sd_lockstruct.ls_ops->lm_proto_name, 
"lock_dlm"))
@@ -73,7 +219,7 @@ int gfs2_lm_withdraw(struct gfs2_sbd *sdp, const char *fmt, 
...)
                        lm->lm_unmount(sdp);
                }
                set_bit(SDF_SKIP_DLM_UNLOCK, &sdp->sd_flags);
-               fs_err(sdp, "withdrawn\n");
+               fs_err(sdp, "File system withdrawn\n");
                dump_stack();
        }
 
diff --git a/fs/gfs2/util.h b/fs/gfs2/util.h
index 16e087da3bd3..036c7cfd856d 100644
--- a/fs/gfs2/util.h
+++ b/fs/gfs2/util.h
@@ -132,6 +132,9 @@ static inline void gfs2_metatype_set(struct buffer_head 
*bh, u16 type,
 int gfs2_io_error_i(struct gfs2_sbd *sdp, const char *function,
                    char *file, unsigned int line);
 
+extern int check_journal_clean(struct gfs2_sbd *sdp, struct gfs2_jdesc *jd,
+                              bool verbose);
+
 #define gfs2_io_error(sdp) \
 gfs2_io_error_i((sdp), __func__, __FILE__, __LINE__);
 
-- 
2.20.1

Reply via email to