HBASE-19935 Only allow table replication for sync replication for now
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e935a4c3 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e935a4c3 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e935a4c3 Branch: refs/heads/HBASE-19064 Commit: e935a4c3c882b7077e3b572e89bc47ff13455910 Parents: c8c774b Author: Guanghao Zhang <zg...@apache.org> Authored: Tue Feb 6 16:00:59 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Fri May 25 10:11:48 2018 +0800 ---------------------------------------------------------------------- .../replication/ReplicationPeerConfig.java | 9 +++ .../replication/ReplicationPeerManager.java | 34 ++++++++- .../replication/TestReplicationAdmin.java | 73 ++++++++++++++------ .../wal/TestCombinedAsyncWriter.java | 6 ++ .../wal/TestSyncReplicationWALProvider.java | 6 ++ 5 files changed, 102 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/e935a4c3/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index 97abc74..997a155 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -25,6 +25,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; + +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; @@ -220,6 +222,13 @@ public class ReplicationPeerConfig { return this.remoteWALDir; } + /** + * Use remote wal dir to decide whether a peer is sync replication peer + */ + public boolean isSyncReplication() { + return !StringUtils.isBlank(this.remoteWALDir); + } + public static ReplicationPeerConfigBuilder newBuilder() { return new ReplicationPeerConfigBuilderImpl(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/e935a4c3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index f07a0d8..ff778a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -170,7 +170,7 @@ public class ReplicationPeerManager { " does not match new remote wal dir '" + peerConfig.getRemoteWALDir() + "'"); } - if (oldPeerConfig.getRemoteWALDir() != null) { + if (oldPeerConfig.isSyncReplication()) { if (!ReplicationUtils.isNamespacesAndTableCFsEqual(oldPeerConfig, peerConfig)) { throw new DoNotRetryIOException( "Changing the replicated namespace/table config on a synchronous replication " + @@ -199,8 +199,8 @@ public class ReplicationPeerManager { } ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build(); SyncReplicationState syncReplicationState = - StringUtils.isBlank(peerConfig.getRemoteWALDir()) ? SyncReplicationState.NONE - : SyncReplicationState.DOWNGRADE_ACTIVE; + copiedPeerConfig.isSyncReplication() ? SyncReplicationState.DOWNGRADE_ACTIVE + : SyncReplicationState.NONE; peerStorage.addPeer(peerId, copiedPeerConfig, enabled, syncReplicationState); peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig, syncReplicationState)); @@ -324,9 +324,37 @@ public class ReplicationPeerManager { peerConfig.getTableCFsMap()); } + if (peerConfig.isSyncReplication()) { + checkPeerConfigForSyncReplication(peerConfig); + } + checkConfiguredWALEntryFilters(peerConfig); } + private void checkPeerConfigForSyncReplication(ReplicationPeerConfig peerConfig) + throws DoNotRetryIOException { + // This is used to reduce the difficulty for implementing the sync replication state transition + // as we need to reopen all the related regions. + // TODO: Add namespace, replicat_all flag back + if (peerConfig.replicateAllUserTables()) { + throw new DoNotRetryIOException( + "Only support replicated table config for sync replication peer"); + } + if (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) { + throw new DoNotRetryIOException( + "Only support replicated table config for sync replication peer"); + } + if (peerConfig.getTableCFsMap() == null || peerConfig.getTableCFsMap().isEmpty()) { + throw new DoNotRetryIOException("Need config replicated tables for sync replication peer"); + } + for (List<String> cfs : peerConfig.getTableCFsMap().values()) { + if (cfs != null && !cfs.isEmpty()) { + throw new DoNotRetryIOException( + "Only support replicated table config for sync replication peer"); + } + } + } + /** * Set a namespace in the peer config means that all tables in this namespace will be replicated * to the peer cluster. http://git-wip-us.apache.org/repos/asf/hbase/blob/e935a4c3/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index a7710e7..d462dbd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -26,6 +26,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -910,6 +911,8 @@ public class TestReplicationAdmin { @Test public void testPeerRemoteWALDir() throws Exception { + TableName tableName = TableName.valueOf(name.getMethodName()); + String rootDir = "hdfs://srv1:9999/hbase"; ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); builder.setClusterKey(KEY_ONE); @@ -929,57 +932,74 @@ public class TestReplicationAdmin { builder = ReplicationPeerConfig.newBuilder(); builder.setClusterKey(KEY_SECOND); builder.setRemoteWALDir(rootDir); - hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); - rpc = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); - assertEquals(rootDir, rpc.getRemoteWALDir()); + try { + hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); + fail("Only support replicated table config for sync replication"); + } catch (Exception e) { + // OK + } + builder.setReplicateAllUserTables(false); try { - builder.setRemoteWALDir("hdfs://srv2:8888/hbase"); - hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); - fail("Change remote wal dir is not allowed"); + Set<String> namespaces = new HashSet<String>(); + namespaces.add("ns1"); + builder.setNamespaces(namespaces); + hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); + fail("Only support replicated table config for sync replication"); } catch (Exception e) { // OK } + builder.setNamespaces(null); try { - builder.setRemoteWALDir(null); - hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); - fail("Change remote wal dir is not allowed"); + hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); + fail("Only support replicated table config for sync replication, and tables can't be empty"); } catch (Exception e) { // OK } + Map<TableName, List<String>> tableCfs = new HashMap<>(); try { - builder = ReplicationPeerConfig.newBuilder(rpc); - builder.setReplicateAllUserTables(false); + tableCfs.put(tableName, Arrays.asList("cf1")); + builder.setTableCFsMap(tableCfs); + hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); + fail("Only support replicated table config for sync replication"); + } catch (Exception e) { + // OK + } + + tableCfs = new HashMap<>(); + tableCfs.put(tableName, new ArrayList<>()); + builder.setTableCFsMap(tableCfs); + hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); + rpc = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); + assertEquals(rootDir, rpc.getRemoteWALDir()); + + try { + builder.setRemoteWALDir("hdfs://srv2:8888/hbase"); hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); - fail( - "Change replicated namespace/table config on an existing synchronous peer is not allowed"); + fail("Change remote wal dir is not allowed"); } catch (Exception e) { // OK } try { - builder = ReplicationPeerConfig.newBuilder(rpc); - Set<String> namespaces = new HashSet<>(); - namespaces.add("ns1"); - builder.setExcludeNamespaces(namespaces); + builder.setRemoteWALDir(null); hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); - fail( - "Change replicated namespace/table config on an existing synchronous peer is not allowed"); + fail("Change remote wal dir is not allowed"); } catch (Exception e) { // OK } try { builder = ReplicationPeerConfig.newBuilder(rpc); - Map<TableName, List<String>> tableCfs = new HashMap<>(); - tableCfs.put(TableName.valueOf(name.getMethodName()), new ArrayList<>()); - builder.setExcludeTableCFsMap(tableCfs); + tableCfs = new HashMap<>(); + tableCfs.put(TableName.valueOf("ns1:" + name.getMethodName()), new ArrayList<>()); + builder.setTableCFsMap(tableCfs); hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); fail( - "Change replicated namespace/table config on an existing synchronous peer is not allowed"); + "Change replicated table config on an existing synchronous peer is not allowed"); } catch (Exception e) { // OK } @@ -987,8 +1007,11 @@ public class TestReplicationAdmin { @Test public void testTransitSyncReplicationPeerState() throws Exception { + TableName tableName = TableName.valueOf(name.getMethodName()); + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); builder.setClusterKey(KEY_ONE); + builder.setReplicateAllUserTables(false); hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); assertEquals(SyncReplicationState.NONE, hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE)); @@ -1005,6 +1028,10 @@ public class TestReplicationAdmin { builder = ReplicationPeerConfig.newBuilder(); builder.setClusterKey(KEY_SECOND); builder.setRemoteWALDir(rootDir); + builder.setReplicateAllUserTables(false); + Map<TableName, List<String>> tableCfs = new HashMap<>(); + tableCfs.put(tableName, new ArrayList<>()); + builder.setTableCFsMap(tableCfs); hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE, hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); http://git-wip-us.apache.org/repos/asf/hbase/blob/e935a4c3/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java index 36dbe0f..07aa6a8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.wal.AsyncFSWALProvider.AsyncWriter; import org.apache.hadoop.hbase.wal.WALFactory; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -51,6 +53,10 @@ import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; @Category({ RegionServerTests.class, MediumTests.class }) public class TestCombinedAsyncWriter { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCombinedAsyncWriter.class); + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static EventLoopGroup EVENT_LOOP_GROUP; http://git-wip-us.apache.org/repos/asf/hbase/blob/e935a4c3/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java index 60a9e13..f09e51e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertThat; import java.io.IOException; import java.util.Optional; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; @@ -41,12 +42,17 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @Category({ RegionServerTests.class, MediumTests.class }) public class TestSyncReplicationWALProvider { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSyncReplicationWALProvider.class); + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static String PEER_ID = "1";