HBASE-19990 Create remote wal directory when transitting to state S

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4838abae
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4838abae
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4838abae

Branch: refs/heads/HBASE-19064
Commit: 4838abae0bf063d23c0a9e0704d30f4e9a6edaa2
Parents: ba46605
Author: zhangduo <zhang...@apache.org>
Authored: Wed Feb 14 16:01:16 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Thu Jun 21 10:20:15 2018 +0800

----------------------------------------------------------------------
 .../procedure2/ProcedureYieldException.java     |  9 ++++--
 .../hbase/replication/ReplicationUtils.java     |  2 ++
 .../hadoop/hbase/master/MasterFileSystem.java   | 19 ++++++-------
 .../master/procedure/MasterProcedureEnv.java    |  5 ++++
 ...ransitPeerSyncReplicationStateProcedure.java | 29 ++++++++++++++++----
 .../hbase/replication/TestSyncReplication.java  |  8 ++++++
 6 files changed, 55 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4838abae/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureYieldException.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureYieldException.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureYieldException.java
index 0487ac5b..dbb9981 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureYieldException.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureYieldException.java
@@ -15,16 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.procedure2;
 
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
-// TODO: Not used yet
+/**
+ * Indicate that a procedure wants to be rescheduled. Usually because there 
are something wrong but
+ * we do not want to fail the procedure.
+ * <p>
+ * TODO: need to support scheduling after a delay.
+ */
 @InterfaceAudience.Private
 @InterfaceStability.Stable
 public class ProcedureYieldException extends ProcedureException {
+
   /** default constructor */
   public ProcedureYieldException() {
     super();

http://git-wip-us.apache.org/repos/asf/hbase/blob/4838abae/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index d94cb00..e402d0f 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -41,6 +41,8 @@ public final class ReplicationUtils {
 
   public static final String REPLICATION_ATTR_NAME = "__rep__";
 
+  public static final String REMOTE_WAL_DIR_NAME = "remoteWALs";
+
   private ReplicationUtils() {
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/4838abae/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
index 864be02..7ccbd71 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -133,7 +134,6 @@ public class MasterFileSystem {
    * Idempotent.
    */
   private void createInitialFileSystemLayout() throws IOException {
-
     final String[] protectedSubDirs = new String[] {
         HConstants.BASE_NAMESPACE_DIR,
         HConstants.HFILE_ARCHIVE_DIRECTORY,
@@ -145,7 +145,8 @@ public class MasterFileSystem {
       HConstants.HREGION_LOGDIR_NAME,
       HConstants.HREGION_OLDLOGDIR_NAME,
       HConstants.CORRUPT_DIR_NAME,
-      WALProcedureStore.MASTER_PROCEDURE_LOGDIR
+      WALProcedureStore.MASTER_PROCEDURE_LOGDIR,
+      ReplicationUtils.REMOTE_WAL_DIR_NAME
     };
     // check if the root directory exists
     checkRootDir(this.rootdir, conf, this.fs);
@@ -192,7 +193,9 @@ public class MasterFileSystem {
     return this.fs;
   }
 
-  protected FileSystem getWALFileSystem() { return this.walFs; }
+  public FileSystem getWALFileSystem() {
+    return this.walFs;
+  }
 
   public Configuration getConfiguration() {
     return this.conf;
@@ -234,13 +237,9 @@ public class MasterFileSystem {
   }
 
   /**
-   * Get the rootdir.  Make sure its wholesome and exists before returning.
-   * @param rd
-   * @param c
-   * @param fs
-   * @return hbase.rootdir (after checks for existence and bootstrapping if
-   * needed populating the directory with necessary bootup files).
-   * @throws IOException
+   * Get the rootdir. Make sure its wholesome and exists before returning.
+   * @return hbase.rootdir (after checks for existence and bootstrapping if 
needed populating the
+   *         directory with necessary bootup files).
    */
   private Path checkRootDir(final Path rd, final Configuration c, final 
FileSystem fs)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/4838abae/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
index 0ec932c..2c01b16 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
 import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
@@ -139,6 +140,10 @@ public class MasterProcedureEnv implements 
ConfigurationObserver {
     return master.getReplicationPeerManager();
   }
 
+  public MasterFileSystem getMasterFileSystem() {
+    return master.getMasterFileSystem();
+  }
+
   public boolean isRunning() {
     if (this.master == null || this.master.getMasterProcedureExecutor() == 
null) return false;
     return master.getMasterProcedureExecutor().isRunning();

http://git-wip-us.apache.org/repos/asf/hbase/blob/4838abae/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
index 69404a0..cc51890 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -20,14 +20,18 @@ package org.apache.hadoop.hbase.master.replication;
 import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
 import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
 import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -44,7 +48,7 @@ public class TransitPeerSyncReplicationStateProcedure
     extends AbstractPeerProcedure<PeerSyncReplicationStateTransitionState> {
 
   private static final Logger LOG =
-    LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class);
+      LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class);
 
   private SyncReplicationState fromState;
 
@@ -67,8 +71,8 @@ public class TransitPeerSyncReplicationStateProcedure
   protected void serializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
     super.serializeStateData(serializer);
     TransitPeerSyncReplicationStateStateData.Builder builder =
-      TransitPeerSyncReplicationStateStateData.newBuilder()
-        .setToState(ReplicationPeerConfigUtil.toSyncReplicationState(toState));
+        TransitPeerSyncReplicationStateStateData.newBuilder()
+          
.setToState(ReplicationPeerConfigUtil.toSyncReplicationState(toState));
     if (fromState != null) {
       
builder.setFromState(ReplicationPeerConfigUtil.toSyncReplicationState(fromState));
     }
@@ -79,7 +83,7 @@ public class TransitPeerSyncReplicationStateProcedure
   protected void deserializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
     super.deserializeStateData(serializer);
     TransitPeerSyncReplicationStateStateData data =
-      serializer.deserialize(TransitPeerSyncReplicationStateStateData.class);
+        serializer.deserialize(TransitPeerSyncReplicationStateStateData.class);
     toState = 
ReplicationPeerConfigUtil.toSyncReplicationState(data.getToState());
     if (data.hasFromState()) {
       fromState = 
ReplicationPeerConfigUtil.toSyncReplicationState(data.getFromState());
@@ -205,7 +209,22 @@ public class TransitPeerSyncReplicationStateProcedure
         }
         return Flow.HAS_MORE_STATE;
       case CREATE_DIR_FOR_REMOTE_WAL:
-        // TODO: create wal for write remote wal
+        MasterFileSystem mfs = env.getMasterFileSystem();
+        Path remoteWALDir = new Path(mfs.getWALRootDir(), 
ReplicationUtils.REMOTE_WAL_DIR_NAME);
+        Path remoteWALDirForPeer = new Path(remoteWALDir, peerId);
+        FileSystem walFs = mfs.getWALFileSystem();
+        try {
+          if (walFs.exists(remoteWALDirForPeer)) {
+            LOG.warn("Wal dir {} already exists, usually this should not 
happen, continue anyway",
+              remoteWALDirForPeer);
+          } else if (!walFs.mkdirs(remoteWALDirForPeer)) {
+            LOG.warn("Can not create remote wal dir {}", remoteWALDirForPeer);
+            throw new ProcedureYieldException();
+          }
+        } catch (IOException e) {
+          LOG.warn("Failed to create remote wal dir {}", remoteWALDirForPeer, 
e);
+          throw new ProcedureYieldException();
+        }
         setNextState(
           
PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
         return Flow.HAS_MORE_STATE;

http://git-wip-us.apache.org/repos/asf/hbase/blob/4838abae/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java
index acddc4a..196019d 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java
@@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.replication;
 
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -154,8 +157,13 @@ public class TestSyncReplication {
 
   @Test
   public void testStandby() throws Exception {
+    MasterFileSystem mfs = 
UTIL2.getHBaseCluster().getMaster().getMasterFileSystem();
+    Path remoteWALDir = new Path(mfs.getWALRootDir(), 
ReplicationUtils.REMOTE_WAL_DIR_NAME);
+    Path remoteWALDirForPeer = new Path(remoteWALDir, PEER_ID);
+    assertFalse(mfs.getWALFileSystem().exists(remoteWALDirForPeer));
     UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
       SyncReplicationState.STANDBY);
+    assertTrue(mfs.getWALFileSystem().exists(remoteWALDirForPeer));
     try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) {
       assertDisallow(table, t -> t.get(new Get(Bytes.toBytes("row"))));
       assertDisallow(table,

Reply via email to