This is an automated email from the ASF dual-hosted git repository. sunxin pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push: new 063e0e2 HBASE-26406 Can not add peer replicating to non-HBase (#3806) 063e0e2 is described below commit 063e0e2e92025430fece0c9504e16124fbbba43b Author: XinSun <ddu...@gmail.com> AuthorDate: Tue Nov 2 14:26:25 2021 +0800 HBASE-26406 Can not add peer replicating to non-HBase (#3806) Signed-off-by: Rushabh Shah <shahr...@gmail.com> Signed-off-by: Duo Zhang <zhang...@apache.org> (cherry picked from commit b9b7fec57f9de5407c63467780f454345963c2a0) --- .../master/replication/ReplicationPeerManager.java | 16 +- .../TestNonHBaseReplicationEndpoint.java | 205 +++++++++++++++++++++ 2 files changed, 213 insertions(+), 8 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 9d8c9e1..f826d5d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; +import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; @@ -46,7 +47,6 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationUtils; -import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; @@ -268,13 +268,13 @@ public class ReplicationPeerManager { e); } } - // Default is HBaseInterClusterReplicationEndpoint and only it need to check cluster key - if (endpoint == null || endpoint instanceof HBaseInterClusterReplicationEndpoint) { + // Endpoints implementing HBaseReplicationEndpoint need to check cluster key + if (endpoint == null || endpoint instanceof HBaseReplicationEndpoint) { checkClusterKey(peerConfig.getClusterKey()); - } - // Default is HBaseInterClusterReplicationEndpoint which cannot replicate to same cluster - if (endpoint == null || !endpoint.canReplicateToSameCluster()) { - checkClusterId(peerConfig.getClusterKey()); + // Check if endpoint can replicate to the same cluster + if (endpoint == null || !endpoint.canReplicateToSameCluster()) { + checkSameClusterKey(peerConfig.getClusterKey()); + } } if (peerConfig.replicateAllUserTables()) { @@ -368,7 +368,7 @@ public class ReplicationPeerManager { } } - private void checkClusterId(String clusterKey) throws DoNotRetryIOException { + private void checkSameClusterKey(String clusterKey) throws DoNotRetryIOException { String peerClusterId = ""; try { // Create the peer cluster config for get peer cluster id diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNonHBaseReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNonHBaseReplicationEndpoint.java new file mode 100644 index 0000000..b1a8bf5 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNonHBaseReplicationEndpoint.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ReplicationTests.class }) +public class TestNonHBaseReplicationEndpoint { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestNonHBaseReplicationEndpoint.class); + + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static Admin ADMIN; + + private static final TableName tableName = TableName.valueOf("test"); + private static final byte[] famName = Bytes.toBytes("f"); + + private static final AtomicBoolean REPLICATED = new AtomicBoolean(); + + @BeforeClass + public static void setupBeforeClass() throws Exception { + UTIL.startMiniCluster(); + ADMIN = UTIL.getAdmin(); + } + + @AfterClass + public static void teardownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Before + public void setup() { + REPLICATED.set(false); + } + + @Test + public void test() throws IOException { + TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName) + .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) + .build(); + Table table = UTIL.createTable(td, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + + ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() + .setReplicationEndpointImpl(NonHBaseReplicationEndpoint.class.getName()) + .setReplicateAllUserTables(false) + .setTableCFsMap(new HashMap<TableName, List<String>>() {{ + put(tableName, new ArrayList<>()); + } + }).build(); + + ADMIN.addReplicationPeer("1", peerConfig); + loadData(table); + + UTIL.waitFor(10000L, () -> REPLICATED.get()); + } + + protected static void loadData(Table table) throws IOException { + for (int i = 0; i < 100; i++) { + Put put = new Put(Bytes.toBytes(Integer.toString(i))); + put.addColumn(famName, famName, Bytes.toBytes(i)); + table.put(put); + } + } + + public static class NonHBaseReplicationEndpoint implements ReplicationEndpoint { + + private boolean running = false; + + @Override + public void init(Context context) throws IOException { + } + + @Override + public boolean canReplicateToSameCluster() { + return false; + } + + @Override + public UUID getPeerUUID() { + return UUID.randomUUID(); + } + + @Override + public WALEntryFilter getWALEntryfilter() { + return null; + } + + @Override + public boolean replicate(ReplicateContext replicateContext) { + REPLICATED.set(true); + return true; + } + + @Override + public boolean isRunning() { + return running; + } + + @Override + public boolean isStarting() { + return false; + } + + @Override + public void start() { + running = true; + } + + @Override + public void awaitRunning() { + long interval = 100L; + while (!running) { + Threads.sleep(interval); + } + } + + @Override + public void awaitRunning(long timeout, TimeUnit unit) { + long start = System.currentTimeMillis(); + long end = start + unit.toMillis(timeout); + long interval = 100L; + while (!running && System.currentTimeMillis() < end) { + Threads.sleep(interval); + } + } + + @Override + public void stop() { + running = false; + } + + @Override + public void awaitTerminated() { + long interval = 100L; + while (running) { + Threads.sleep(interval); + } + } + + @Override + public void awaitTerminated(long timeout, TimeUnit unit) { + long start = System.currentTimeMillis(); + long end = start + unit.toMillis(timeout); + long interval = 100L; + while (running && System.currentTimeMillis() < end) { + Threads.sleep(interval); + } + } + + @Override + public Throwable failureCause() { + return null; + } + + @Override + public void peerConfigUpdated(ReplicationPeerConfig rpc) { + } + } +} \ No newline at end of file