HBASE-19078 Add a remote peer cluster wal directory config for synchronous replication
Signed-off-by: zhangduo <zhang...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cded0fb0 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cded0fb0 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cded0fb0 Branch: refs/heads/HBASE-19064 Commit: cded0fb0a9e27d72445139bd9d20f67db5d982ad Parents: fc7fee0 Author: Guanghao Zhang <zg...@apache.org> Authored: Sat Jan 13 18:55:28 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Fri Mar 2 14:25:13 2018 +0800 ---------------------------------------------------------------------- .../replication/ReplicationPeerConfigUtil.java | 6 ++ .../replication/ReplicationPeerConfig.java | 21 +++++- .../ReplicationPeerConfigBuilder.java | 7 ++ .../src/main/protobuf/Replication.proto | 1 + .../replication/ReplicationPeerManager.java | 15 ++++ .../replication/TestReplicationAdmin.java | 77 ++++++++++++++++++++ .../src/main/ruby/hbase/replication_admin.rb | 17 +++-- hbase-shell/src/main/ruby/hbase_constants.rb | 1 + .../src/main/ruby/shell/commands/add_peer.rb | 21 +++++- .../src/main/ruby/shell/commands/list_peers.rb | 19 ++++- .../test/ruby/hbase/replication_admin_test.rb | 16 ++++ 11 files changed, 188 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/cded0fb0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index a234a9b..642149b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -315,6 +315,9 @@ public final class ReplicationPeerConfigUtil { excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet())); } + if (peer.hasRemoteWALDir()) { + builder.setRemoteWALDir(peer.getRemoteWALDir()); + } return builder.build(); } @@ -371,6 +374,9 @@ public final class ReplicationPeerConfigUtil { } } + if (peerConfig.getRemoteWALDir() != null) { + builder.setRemoteWALDir(peerConfig.getRemoteWALDir()); + } return builder.build(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/cded0fb0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index bf8d030..4c10c46 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -46,6 +46,8 @@ public class ReplicationPeerConfig { private Map<TableName, ? extends Collection<String>> excludeTableCFsMap = null; private Set<String> excludeNamespaces = null; private long bandwidth = 0; + // Used by synchronous replication + private String remoteWALDir; private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) { this.clusterKey = builder.clusterKey; @@ -64,6 +66,7 @@ public class ReplicationPeerConfig { builder.excludeNamespaces != null ? Collections.unmodifiableSet(builder.excludeNamespaces) : null; this.bandwidth = builder.bandwidth; + this.remoteWALDir = builder.remoteWALDir; } private Map<TableName, List<String>> @@ -210,6 +213,10 @@ public class ReplicationPeerConfig { return this; } + public String getRemoteWALDir() { + return this.remoteWALDir; + } + public static ReplicationPeerConfigBuilder newBuilder() { return new ReplicationPeerConfigBuilderImpl(); } @@ -223,7 +230,8 @@ public class ReplicationPeerConfig { .setReplicateAllUserTables(peerConfig.replicateAllUserTables()) .setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap()) .setExcludeNamespaces(peerConfig.getExcludeNamespaces()) - .setBandwidth(peerConfig.getBandwidth()); + .setBandwidth(peerConfig.getBandwidth()) + .setRemoteWALDir(peerConfig.getRemoteWALDir()); return builder; } @@ -250,6 +258,8 @@ public class ReplicationPeerConfig { private long bandwidth = 0; + private String remoteWALDir = null; + @Override public ReplicationPeerConfigBuilder setClusterKey(String clusterKey) { this.clusterKey = clusterKey; @@ -313,6 +323,12 @@ public class ReplicationPeerConfig { } @Override + public ReplicationPeerConfigBuilder setRemoteWALDir(String dir) { + this.remoteWALDir = dir; + return this; + } + + @Override public ReplicationPeerConfig build() { // It would be nice to validate the configuration, but we have to work with "old" data // from ZK which makes it much more difficult. @@ -341,6 +357,9 @@ public class ReplicationPeerConfig { } } builder.append("bandwidth=").append(bandwidth); + if (this.remoteWALDir != null) { + builder.append(",remoteWALDir=").append(remoteWALDir); + } return builder.toString(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/cded0fb0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java index 0b2f2e2..eac98c6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java @@ -138,6 +138,13 @@ public interface ReplicationPeerConfigBuilder { ReplicationPeerConfigBuilder setExcludeNamespaces(Set<String> namespaces); /** + * Set the remote peer cluster's wal directory. Used by synchronous replication. + * @param dir the remote peer cluster's wal directory + * @return {@code this} + */ + ReplicationPeerConfigBuilder setRemoteWALDir(String dir); + + /** * Builds the configuration object from the current state of {@code this}. * @return A {@link ReplicationPeerConfig} instance. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/cded0fb0/hbase-protocol-shaded/src/main/protobuf/Replication.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto index 9f7b4c2..44295d8 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto @@ -48,6 +48,7 @@ message ReplicationPeer { optional bool replicate_all = 8; repeated TableCF exclude_table_cfs = 9; repeated bytes exclude_namespaces = 10; + optional string remoteWALDir = 11; } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/cded0fb0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 19fc7f4..d715e2e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -146,6 +146,21 @@ public class ReplicationPeerManager { oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId + " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'"); } + + if (!isStringEquals(peerConfig.getRemoteWALDir(), oldPeerConfig.getRemoteWALDir())) { + throw new DoNotRetryIOException( + "Changing the remote wal dir on an existing peer is not allowed. Existing remote wal " + + "dir '" + oldPeerConfig.getRemoteWALDir() + "' for peer " + peerId + + " does not match new remote wal dir '" + peerConfig.getRemoteWALDir() + "'"); + } + + if (oldPeerConfig.getRemoteWALDir() != null) { + if (!ReplicationUtils.isKeyConfigEqual(oldPeerConfig, peerConfig)) { + throw new DoNotRetryIOException( + "Changing the replicated namespace/table config on a synchronous replication " + + "peer(peerId: " + peerId + ") is not allowed."); + } + } } public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) http://git-wip-us.apache.org/repos/asf/hbase/blob/cded0fb0/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index 685c560..e471100 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -906,4 +906,81 @@ public class TestReplicationAdmin { // OK } } + + @Test + public void testPeerRemoteWALDir() throws Exception { + String rootDir = "hdfs://srv1:9999/hbase"; + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); + builder.setClusterKey(KEY_ONE); + hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); + + ReplicationPeerConfig rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); + assertNull(rpc.getRemoteWALDir()); + + try { + builder.setRemoteWALDir("hdfs://srv2:8888/hbase"); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); + fail("Change remote wal dir is not allowed"); + } catch (Exception e) { + // OK + } + + builder = ReplicationPeerConfig.newBuilder(); + builder.setClusterKey(KEY_SECOND); + builder.setRemoteWALDir(rootDir); + hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); + + rpc = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); + assertEquals(rootDir, rpc.getRemoteWALDir()); + + try { + builder.setRemoteWALDir("hdfs://srv2:8888/hbase"); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); + fail("Change remote wal dir is not allowed"); + } catch (Exception e) { + // OK + } + + try { + builder.setRemoteWALDir(null); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); + fail("Change remote wal dir is not allowed"); + } catch (Exception e) { + // OK + } + + try { + builder = ReplicationPeerConfig.newBuilder(rpc); + builder.setReplicateAllUserTables(false); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); + fail( + "Change replicated namespace/table config on an existing synchronous peer is not allowed"); + } catch (Exception e) { + // OK + } + + try { + builder = ReplicationPeerConfig.newBuilder(rpc); + Set<String> namespaces = new HashSet<>(); + namespaces.add("ns1"); + builder.setExcludeNamespaces(namespaces); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); + fail( + "Change replicated namespace/table config on an existing synchronous peer is not allowed"); + } catch (Exception e) { + // OK + } + + try { + builder = ReplicationPeerConfig.newBuilder(rpc); + Map<TableName, List<String>> tableCfs = new HashMap<>(); + tableCfs.put(TableName.valueOf(name.getMethodName()), new ArrayList<>()); + builder.setExcludeTableCFsMap(tableCfs); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); + fail( + "Change replicated namespace/table config on an existing synchronous peer is not allowed"); + } catch (Exception e) { + // OK + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/cded0fb0/hbase-shell/src/main/ruby/hbase/replication_admin.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index b9d4a0c..ba7d191 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -64,16 +64,20 @@ module Hbase table_cfs = args.fetch(TABLE_CFS, nil) namespaces = args.fetch(NAMESPACES, nil) peer_state = args.fetch(STATE, nil) + remote_wal_dir = args.fetch(REMOTE_WAL_DIR, nil) # Create and populate a ReplicationPeerConfig - builder = org.apache.hadoop.hbase.replication.ReplicationPeerConfig - .newBuilder() + builder = ReplicationPeerConfig.newBuilder() builder.set_cluster_key(cluster_key) unless endpoint_classname.nil? builder.set_replication_endpoint_impl(endpoint_classname) end + unless remote_wal_dir.nil? + builder.setRemoteWALDir(remote_wal_dir) + end + unless config.nil? builder.putAllConfiguration(config) end @@ -228,8 +232,7 @@ module Hbase namespaces.each do |n| ns_set.add(n) end - builder = org.apache.hadoop.hbase.replication.ReplicationPeerConfig - .newBuilder(rpc) + builder = ReplicationPeerConfig.newBuilder(rpc) builder.setNamespaces(ns_set) @admin.updateReplicationPeerConfig(id, builder.build) end @@ -248,8 +251,7 @@ module Hbase ns_set.remove(n) end end - builder = org.apache.hadoop.hbase.replication.ReplicationPeerConfig - .newBuilder(rpc) + builder = ReplicationPeerConfig.newBuilder(rpc) builder.setNamespaces(ns_set) @admin.updateReplicationPeerConfig(id, builder.build) end @@ -361,8 +363,7 @@ module Hbase # Create and populate a ReplicationPeerConfig replication_peer_config = get_peer_config(id) - builder = org.apache.hadoop.hbase.replication.ReplicationPeerConfig - .newBuilder(replication_peer_config) + builder = ReplicationPeerConfig.newBuilder(replication_peer_config) unless config.nil? builder.putAllConfiguration(config) end http://git-wip-us.apache.org/repos/asf/hbase/blob/cded0fb0/hbase-shell/src/main/ruby/hbase_constants.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/hbase_constants.rb b/hbase-shell/src/main/ruby/hbase_constants.rb index 28484cb..2870dfb 100644 --- a/hbase-shell/src/main/ruby/hbase_constants.rb +++ b/hbase-shell/src/main/ruby/hbase_constants.rb @@ -77,6 +77,7 @@ module HBaseConstants VALUE = 'VALUE'.freeze ENDPOINT_CLASSNAME = 'ENDPOINT_CLASSNAME'.freeze CLUSTER_KEY = 'CLUSTER_KEY'.freeze + REMOTE_WAL_DIR = 'REMOTE_WAL_DIR'.freeze TABLE_CFS = 'TABLE_CFS'.freeze NAMESPACES = 'NAMESPACES'.freeze STATE = 'STATE'.freeze http://git-wip-us.apache.org/repos/asf/hbase/blob/cded0fb0/hbase-shell/src/main/ruby/shell/commands/add_peer.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb index eb2da83..4b6f294 100644 --- a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb +++ b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb @@ -35,7 +35,7 @@ to the peer cluster. An optional parameter for table column families identifies which tables and/or column families will be replicated to the peer cluster. -Notice: Set a namespace in the peer config means that all tables in this namespace +Note: Set a namespace in the peer config means that all tables in this namespace will be replicated to the peer cluster. So if you already have set a namespace in peer config, then you can't set this namespace's tables in the peer config again. @@ -74,6 +74,25 @@ the key TABLE_CFS. Note: Either CLUSTER_KEY or ENDPOINT_CLASSNAME must be specified. If ENDPOINT_CLASSNAME is specified, CLUSTER_KEY is optional and should only be specified if a particular custom endpoint requires it. +The default replication peer is asynchronous. You can also add a synchronous replication peer +with REMOTE_WAL_DIR parameter. Meanwhile, synchronous replication peer also support other optional +config for asynchronous replication peer. + +Examples: + + hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", + REMOTE_WAL_DIR => "hdfs://srv1:9999/hbase" + hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", + STATE => "ENABLED", REMOTE_WAL_DIR => "hdfs://srv1:9999/hbase" + hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", + STATE => "DISABLED", REMOTE_WAL_DIR => "hdfs://srv1:9999/hbase" + hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", + REMOTE_WAL_DIR => "hdfs://srv1:9999/hbase", NAMESPACES => ["ns1", "ns2"] + hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", + REMOTE_WAL_DIR => "hdfs://srv1:9999/hbase", TABLE_CFS => { "table1" => [] } + +Note: The REMOTE_WAL_DIR is not allowed to change. + EOF end http://git-wip-us.apache.org/repos/asf/hbase/blob/cded0fb0/hbase-shell/src/main/ruby/shell/commands/list_peers.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb index 522d23d..caeab86 100644 --- a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb +++ b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb @@ -39,7 +39,8 @@ EOF peers = replication_admin.list_peers formatter.header(%w[PEER_ID CLUSTER_KEY ENDPOINT_CLASSNAME - STATE REPLICATE_ALL NAMESPACES TABLE_CFS BANDWIDTH]) + REMOTE_ROOT_DIR STATE REPLICATE_ALL + NAMESPACES TABLE_CFS BANDWIDTH]) peers.each do |peer| id = peer.getPeerId @@ -52,8 +53,20 @@ EOF namespaces = replication_admin.show_peer_namespaces(config) tableCFs = replication_admin.show_peer_tableCFs_by_config(config) end - formatter.row([id, config.getClusterKey, - config.getReplicationEndpointImpl, state, + cluster_key = 'nil' + unless config.getClusterKey.nil? + cluster_key = config.getClusterKey + end + endpoint_classname = 'nil' + unless config.getReplicationEndpointImpl.nil? + endpoint_classname = config.getReplicationEndpointImpl + end + remote_root_dir = 'nil' + unless config.getRemoteWALDir.nil? + remote_root_dir = config.getRemoteWALDir + end + formatter.row([id, cluster_key, endpoint_classname, + remote_root_dir, state, config.replicateAllUserTables, namespaces, tableCFs, config.getBandwidth]) end http://git-wip-us.apache.org/repos/asf/hbase/blob/cded0fb0/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb index 0f84396..7f2b6ae 100644 --- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb @@ -97,6 +97,22 @@ module Hbase command(:remove_peer, @peer_id) end + define_test "add_peer: remote wal dir" do + cluster_key = "server1.cie.com:2181:/hbase" + remote_wal_dir = "hdfs://srv1:9999/hbase" + args = { CLUSTER_KEY => cluster_key, REMOTE_WAL_DIR => remote_wal_dir } + command(:add_peer, @peer_id, args) + + assert_equal(1, command(:list_peers).length) + peer = command(:list_peers).get(0) + assert_equal(@peer_id, peer.getPeerId) + assert_equal(cluster_key, peer.getPeerConfig.getClusterKey) + assert_equal(remote_wal_dir, peer.getPeerConfig.getRemoteWALDir) + + # cleanup for future tests + command(:remove_peer, @peer_id) + end + define_test "add_peer: single zk cluster key with enabled/disabled state" do cluster_key = "server1.cie.com:2181:/hbase"