HBASE-20576 Check remote WAL directory when creating peer and transiting peer 
to A


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5b63ce6b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5b63ce6b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5b63ce6b

Branch: refs/heads/HBASE-19064
Commit: 5b63ce6b6408439b3b251e4b7497d1878517e47f
Parents: 711c56e
Author: zhangduo <zhang...@apache.org>
Authored: Tue May 15 15:07:40 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Tue Jun 26 14:26:32 2018 +0800

----------------------------------------------------------------------
 .../replication/ReplicationPeerManager.java     | 19 +++--
 ...ransitPeerSyncReplicationStateProcedure.java | 73 +++++++++++++-------
 .../replication/TestReplicationAdmin.java       | 57 ++++++++++++---
 3 files changed, 110 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5b63ce6b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index e1d8b51..8e49137 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.master.replication;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
@@ -31,6 +32,7 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
@@ -45,7 +47,6 @@ import 
org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -193,9 +194,9 @@ public class ReplicationPeerManager {
   }
 
   /**
-   * @return the old state, and whether the peer is enabled.
+   * @return the old desciption of the peer
    */
-  Pair<SyncReplicationState, Boolean> 
preTransitPeerSyncReplicationState(String peerId,
+  ReplicationPeerDescription preTransitPeerSyncReplicationState(String peerId,
       SyncReplicationState state) throws DoNotRetryIOException {
     ReplicationPeerDescription desc = checkPeerExists(peerId);
     SyncReplicationState fromState = desc.getSyncReplicationState();
@@ -204,7 +205,7 @@ public class ReplicationPeerManager {
       throw new DoNotRetryIOException("Can not transit current cluster state 
from " + fromState +
         " to " + state + " for peer id=" + peerId);
     }
-    return Pair.newPair(fromState, desc.isEnabled());
+    return desc;
   }
 
   public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean 
enabled)
@@ -384,6 +385,16 @@ public class ReplicationPeerManager {
           "Only support replicated table config for sync replication peer");
       }
     }
+    Path remoteWALDir = new Path(peerConfig.getRemoteWALDir());
+    if (!remoteWALDir.isAbsolute()) {
+      throw new DoNotRetryIOException(
+        "The remote WAL directory " + peerConfig.getRemoteWALDir() + " is not 
absolute");
+    }
+    URI remoteWALDirUri = remoteWALDir.toUri();
+    if (remoteWALDirUri.getScheme() == null || remoteWALDirUri.getAuthority() 
== null) {
+      throw new DoNotRetryIOException("The remote WAL directory " + 
peerConfig.getRemoteWALDir() +
+        " is not qualified, you must provide scheme and authority");
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/5b63ce6b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
index 0175296..ebe7a93 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
@@ -31,9 +32,9 @@ import 
org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
 import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
 import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -113,10 +114,20 @@ public class TransitPeerSyncReplicationStateProcedure
     if (cpHost != null) {
       cpHost.preTransitReplicationPeerSyncReplicationState(peerId, toState);
     }
-    Pair<SyncReplicationState, Boolean> pair =
+    ReplicationPeerDescription desc =
       
env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, 
toState);
-    fromState = pair.getFirst();
-    enabled = pair.getSecond();
+    if (toState == SyncReplicationState.ACTIVE) {
+      Path remoteWALDirForPeer =
+        
ReplicationUtils.getRemoteWALDirForPeer(desc.getPeerConfig().getRemoteWALDir(), 
peerId);
+      // check whether the remote wal directory is present
+      if (!remoteWALDirForPeer.getFileSystem(env.getMasterConfiguration())
+        .exists(remoteWALDirForPeer)) {
+        throw new DoNotRetryIOException(
+          "The remote WAL directory " + remoteWALDirForPeer + " does not 
exist");
+      }
+    }
+    fromState = desc.getSyncReplicationState();
+    enabled = desc.isEnabled();
   }
 
   private void postTransit(MasterProcedureEnv env) throws IOException {
@@ -152,6 +163,36 @@ public class TransitPeerSyncReplicationStateProcedure
     }
   }
 
+  private void setNextStateAfterRefreshBegin() {
+    if (fromState.equals(SyncReplicationState.ACTIVE)) {
+      setNextState(toState.equals(SyncReplicationState.STANDBY)
+        ? 
PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
+        : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
+    } else if (fromState.equals(SyncReplicationState.DOWNGRADE_ACTIVE)) {
+      setNextState(toState.equals(SyncReplicationState.STANDBY)
+        ? 
PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
+        : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
+    } else {
+      assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE);
+      
setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
+    }
+  }
+
+  private void setNextStateAfterRefreshEnd() {
+    if (toState == SyncReplicationState.STANDBY) {
+      setNextState(
+        enabled ? 
PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED
+          : PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
+    } else {
+      setNextState(
+        
PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
+    }
+  }
+
+  private void replayRemoteWAL() {
+    addChildProcedure(new RecoverStandbyProcedure[] { new 
RecoverStandbyProcedure(peerId) });
+  }
+
   @Override
   protected Flow executeFromState(MasterProcedureEnv env,
       PeerSyncReplicationStateTransitionState state)
@@ -191,21 +232,10 @@ public class TransitPeerSyncReplicationStateProcedure
         
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
           .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), 
sn, 0))
           .toArray(RefreshPeerProcedure[]::new));
-        if (fromState.equals(SyncReplicationState.ACTIVE)) {
-          setNextState(toState.equals(SyncReplicationState.STANDBY)
-            ? 
PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
-            : 
PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
-        } else if (fromState.equals(SyncReplicationState.DOWNGRADE_ACTIVE)) {
-          setNextState(toState.equals(SyncReplicationState.STANDBY)
-            ? 
PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
-            : 
PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
-        } else {
-          assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE);
-          
setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
-        }
+        setNextStateAfterRefreshBegin();
         return Flow.HAS_MORE_STATE;
       case REPLAY_REMOTE_WAL_IN_PEER:
-        addChildProcedure(new RecoverStandbyProcedure(peerId));
+        replayRemoteWAL();
         setNextState(
           
PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
         return Flow.HAS_MORE_STATE;
@@ -248,14 +278,7 @@ public class TransitPeerSyncReplicationStateProcedure
         
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
           .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), 
sn, 1))
           .toArray(RefreshPeerProcedure[]::new));
-        if (toState == SyncReplicationState.STANDBY) {
-          setNextState(
-            enabled ? 
PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED
-              : 
PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
-        } else {
-          setNextState(
-            
PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
-        }
+        setNextStateAfterRefreshEnd();
         return Flow.HAS_MORE_STATE;
       case SYNC_REPLICATION_SET_PEER_ENABLED:
         try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/5b63ce6b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index ac98283..c6ffeea 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -49,6 +49,7 @@ import 
org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import 
org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest;
 import 
org.apache.hadoop.hbase.replication.regionserver.TestReplicator.ReplicationEndpointForTest;
@@ -981,34 +982,37 @@ public class TestReplicationAdmin {
     ReplicationPeerConfig rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
     assertNull(rpc.getRemoteWALDir());
 
+    builder.setRemoteWALDir("hdfs://srv2:8888/hbase");
     try {
-      builder.setRemoteWALDir("hdfs://srv2:8888/hbase");
       hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
       fail("Change remote wal dir is not allowed");
     } catch (Exception e) {
       // OK
+      LOG.info("Expected error:", e);
     }
 
     builder = ReplicationPeerConfig.newBuilder();
     builder.setClusterKey(KEY_SECOND);
-    builder.setRemoteWALDir(rootDir);
+    builder.setRemoteWALDir("whatever");
 
     try {
       hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
       fail("Only support replicated table config for sync replication");
     } catch (Exception e) {
       // OK
+      LOG.info("Expected error:", e);
     }
 
     builder.setReplicateAllUserTables(false);
+    Set<String> namespaces = new HashSet<String>();
+    namespaces.add("ns1");
+    builder.setNamespaces(namespaces);
     try {
-      Set<String> namespaces = new HashSet<String>();
-      namespaces.add("ns1");
-      builder.setNamespaces(namespaces);
       hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
       fail("Only support replicated table config for sync replication");
     } catch (Exception e) {
       // OK
+      LOG.info("Expected error:", e);
     }
 
     builder.setNamespaces(null);
@@ -1017,21 +1021,41 @@ public class TestReplicationAdmin {
       fail("Only support replicated table config for sync replication, and 
tables can't be empty");
     } catch (Exception e) {
       // OK
+      LOG.info("Expected error:", e);
     }
 
     Map<TableName, List<String>> tableCfs = new HashMap<>();
+    tableCfs.put(tableName, Arrays.asList("cf1"));
+    builder.setTableCFsMap(tableCfs);
     try {
-      tableCfs.put(tableName, Arrays.asList("cf1"));
-      builder.setTableCFsMap(tableCfs);
       hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
       fail("Only support replicated table config for sync replication");
     } catch (Exception e) {
       // OK
+      LOG.info("Expected error:", e);
     }
 
     tableCfs = new HashMap<>();
     tableCfs.put(tableName, new ArrayList<>());
     builder.setTableCFsMap(tableCfs);
+    try {
+      hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
+      fail("The remote WAL dir must be absolute");
+    } catch (Exception e) {
+      // OK
+      LOG.info("Expected error:", e);
+    }
+
+    builder.setRemoteWALDir("/hbase/remoteWALs");
+    try {
+      hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
+      fail("The remote WAL dir must be qualified");
+    } catch (Exception e) {
+      // OK
+      LOG.info("Expected error:", e);
+    }
+
+    builder.setRemoteWALDir(rootDir);
     hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
     rpc = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
     assertEquals(rootDir, rpc.getRemoteWALDir());
@@ -1042,6 +1066,7 @@ public class TestReplicationAdmin {
       fail("Change remote wal dir is not allowed");
     } catch (Exception e) {
       // OK
+      LOG.info("Expected error:", e);
     }
 
     try {
@@ -1050,6 +1075,7 @@ public class TestReplicationAdmin {
       fail("Change remote wal dir is not allowed");
     } catch (Exception e) {
       // OK
+      LOG.info("Expected error:", e);
     }
 
     try {
@@ -1062,6 +1088,7 @@ public class TestReplicationAdmin {
         "Change replicated table config on an existing synchronous peer is not 
allowed");
     } catch (Exception e) {
       // OK
+      LOG.info("Expected error:", e);
     }
   }
 
@@ -1079,13 +1106,13 @@ public class TestReplicationAdmin {
     try {
       hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_ONE,
         SyncReplicationState.DOWNGRADE_ACTIVE);
-      fail("Can't transit cluster state if replication peer don't config 
remote wal dir");
+      fail("Can't transit sync replication state if replication peer don't 
config remote wal dir");
     } catch (Exception e) {
       // OK
+      LOG.info("Expected error:", e);
     }
 
     Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("remoteWAL");
-    TEST_UTIL.getTestFileSystem().mkdirs(new Path(rootDir, ID_SECOND));
     builder = ReplicationPeerConfig.newBuilder();
     builder.setClusterKey(KEY_SECOND);
     
builder.setRemoteWALDir(rootDir.makeQualified(TEST_UTIL.getTestFileSystem().getUri(),
@@ -1106,6 +1133,15 @@ public class TestReplicationAdmin {
     assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
       hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
 
+    try {
+      hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, 
SyncReplicationState.ACTIVE);
+      fail("Can't transit sync replication state to ACTIVE if remote wal dir 
does not exist");
+    } catch (Exception e) {
+      // OK
+      LOG.info("Expected error:", e);
+    }
+    TEST_UTIL.getTestFileSystem()
+      .mkdirs(ReplicationUtils.getRemoteWALDirForPeer(rootDir, ID_SECOND));
     hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, 
SyncReplicationState.ACTIVE);
     assertEquals(SyncReplicationState.ACTIVE,
       hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
@@ -1133,9 +1169,10 @@ public class TestReplicationAdmin {
       hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
     try {
       hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, 
SyncReplicationState.ACTIVE);
-      fail("Can't transit cluster state from STANDBY to ACTIVE");
+      fail("Can't transit sync replication state from STANDBY to ACTIVE");
     } catch (Exception e) {
       // OK
+      LOG.info("Expected error:", e);
     }
     hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND,
       SyncReplicationState.DOWNGRADE_ACTIVE);

Reply via email to