HBASE-19630 Add peer cluster key check when add new replication peer Signed-off-by: zhangduo <zhang...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/621ab2cc Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/621ab2cc Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/621ab2cc Branch: refs/heads/HBASE-19397 Commit: 621ab2cc3dffbb7086d73d22d050d78a1de959e5 Parents: 7defbdc Author: Guanghao Zhang <zg...@apache.org> Authored: Tue Dec 26 21:10:00 2017 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Wed Dec 27 09:40:34 2017 +0800 ---------------------------------------------------------------------- .../replication/ReplicationPeerManager.java | 54 ++++++++++++-------- .../replication/TestReplicationAdmin.java | 22 ++++++++ 2 files changed, 54 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/621ab2cc/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 84abfeb..b78cbce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.master.replication; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; @@ -216,36 +218,36 @@ public final class ReplicationPeerManager { return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty(); } - /** - * If replicate_all flag is true, it means all user tables will be replicated to peer cluster. - * Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer - * cluster. - * <p> - * If replicate_all flag is false, it means all user tables can't be replicated to peer cluster. - * Then allow to config namespaces or table-cfs which will be replicated to peer cluster. - */ - private static void checkPeerConfig(ReplicationPeerConfig peerConfig) - throws DoNotRetryIOException { + private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { + checkClusterKey(peerConfig.getClusterKey()); + if (peerConfig.replicateAllUserTables()) { - if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) || - (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { - throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " + - "when you want replicate all cluster"); + // If replicate_all flag is true, it means all user tables will be replicated to peer cluster. + // Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer + // cluster. + if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) + || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { + throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " + + "when you want replicate all cluster"); } checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(), peerConfig.getExcludeTableCFsMap()); } else { - if ((peerConfig.getExcludeNamespaces() != null && - !peerConfig.getExcludeNamespaces().isEmpty()) || - (peerConfig.getExcludeTableCFsMap() != null && - !peerConfig.getExcludeTableCFsMap().isEmpty())) { + // If replicate_all flag is false, it means all user tables can't be replicated to peer + // cluster. Then allow to config namespaces or table-cfs which will be replicated to peer + // cluster. + if ((peerConfig.getExcludeNamespaces() != null + && !peerConfig.getExcludeNamespaces().isEmpty()) + || (peerConfig.getExcludeTableCFsMap() != null + && !peerConfig.getExcludeTableCFsMap().isEmpty())) { throw new DoNotRetryIOException( - "Need clean exclude-namespaces or exclude-table-cfs config firstly" + - " when replicate_all flag is false"); + "Need clean exclude-namespaces or exclude-table-cfs config firstly" + + " when replicate_all flag is false"); } checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), peerConfig.getTableCFsMap()); } + checkConfiguredWALEntryFilters(peerConfig); } @@ -268,7 +270,7 @@ public final class ReplicationPeerManager { * exclude namespace.</li> * </ol> */ - private static void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces, + private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces, Map<TableName, ? extends Collection<String>> tableCfs) throws DoNotRetryIOException { if (namespaces == null || namespaces.isEmpty()) { return; @@ -285,7 +287,7 @@ public final class ReplicationPeerManager { } } - private static void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) + private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { String filterCSV = peerConfig.getConfiguration() .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); @@ -302,6 +304,14 @@ public final class ReplicationPeerManager { } } + private void checkClusterKey(String clusterKey) throws DoNotRetryIOException { + try { + ZKConfig.validateClusterKey(clusterKey); + } catch (IOException e) { + throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e); + } + } + public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf) throws ReplicationException { ReplicationPeerStorage peerStorage = http://git-wip-us.apache.org/repos/asf/hbase/blob/621ab2cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index 8bb3230..a198d20 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -112,6 +112,28 @@ public class TestReplicationAdmin { } } + @Test + public void testAddInvalidPeer() { + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); + builder.setClusterKey(KEY_ONE); + try { + String invalidPeerId = "1-2"; + hbaseAdmin.addReplicationPeer(invalidPeerId, builder.build()); + fail("Should fail as the peer id: " + invalidPeerId + " is invalid"); + } catch (Exception e) { + // OK + } + + try { + String invalidClusterKey = "2181:/hbase"; + builder.setClusterKey(invalidClusterKey); + hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); + fail("Should fail as the peer cluster key: " + invalidClusterKey + " is invalid"); + } catch (Exception e) { + // OK + } + } + /** * Simple testing of adding and removing peers, basically shows that * all interactions with ZK work