This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 5bb76bf  Revert "HBASE-24743 Reject to add a peer which replicate to 
itself earlier (#2071)"
5bb76bf is described below

commit 5bb76bf22dd9c0f41369a20f2a3cd1d10d14f047
Author: Viraj Jasani <vjas...@apache.org>
AuthorDate: Tue Jul 21 20:20:51 2020 +0530

    Revert "HBASE-24743 Reject to add a peer which replicate to itself earlier 
(#2071)"
    
    This reverts commit 5db3ec2cfb0bc44c99d7bd3bc61a45ecd258c3b6.
    
    TestReplicationAdmin and TestReplicationShell are broken on branch-2 and 
master respectively
---
 .../org/apache/hadoop/hbase/master/HMaster.java    |  2 +-
 .../master/replication/ReplicationPeerManager.java | 50 +++++-----------------
 .../regionserver/ReplicationSource.java            | 10 +++++
 .../hbase/client/TestAsyncReplicationAdminApi.java |  6 +--
 .../replication/SerialReplicationTestBase.java     |  5 ---
 .../hbase/replication/TestMasterReplication.java   | 42 +++++++++++++++---
 .../hbase/replication/TestReplicationEndpoint.java |  5 ---
 .../TestRaceWhenCreatingReplicationSource.java     |  5 ---
 .../TestHBaseFsckCleanReplicationBarriers.java     |  5 +--
 9 files changed, 62 insertions(+), 68 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 0cf6de4..d01f357 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -801,7 +801,7 @@ public class HMaster extends HRegionServer implements 
MasterServices {
     this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
     this.splitOrMergeTracker.start();
 
-    this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, 
conf, clusterId);
+    this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, 
conf);
 
     this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, 
this.serverManager);
     this.drainingServerTracker.start();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 0823ac0..29a8a1b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -31,7 +31,6 @@ import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
@@ -46,11 +45,9 @@ import 
org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import 
org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
 
 /**
  * Manages and performs all replication admin operations.
@@ -66,17 +63,11 @@ public class ReplicationPeerManager {
 
   private final ConcurrentMap<String, ReplicationPeerDescription> peers;
 
-  private final String clusterId;
-
-  private final Configuration conf;
-
   ReplicationPeerManager(ReplicationPeerStorage peerStorage, 
ReplicationQueueStorage queueStorage,
-    ConcurrentMap<String, ReplicationPeerDescription> peers, Configuration 
conf, String clusterId) {
+      ConcurrentMap<String, ReplicationPeerDescription> peers) {
     this.peerStorage = peerStorage;
     this.queueStorage = queueStorage;
     this.peers = peers;
-    this.conf = conf;
-    this.clusterId = clusterId;
   }
 
   private void checkQueuesDeleted(String peerId)
@@ -254,10 +245,11 @@ public class ReplicationPeerManager {
 
   private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws 
DoNotRetryIOException {
     String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
-    ReplicationEndpoint endpoint = null;
+    boolean checkClusterKey = true;
     if (!StringUtils.isBlank(replicationEndpointImpl)) {
+      // try creating a instance
+      ReplicationEndpoint endpoint;
       try {
-        // try creating a instance
         endpoint = Class.forName(replicationEndpointImpl)
           
.asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance();
       } catch (Throwable e) {
@@ -265,15 +257,14 @@ public class ReplicationPeerManager {
           "Can not instantiate configured replication endpoint class=" + 
replicationEndpointImpl,
           e);
       }
+      // do not check cluster key if we are not 
HBaseInterClusterReplicationEndpoint
+      if (!(endpoint instanceof HBaseInterClusterReplicationEndpoint)) {
+        checkClusterKey = false;
+      }
     }
-    // Default is HBaseInterClusterReplicationEndpoint and only it need to 
check cluster key
-    if (endpoint == null || endpoint instanceof 
HBaseInterClusterReplicationEndpoint) {
+    if (checkClusterKey) {
       checkClusterKey(peerConfig.getClusterKey());
     }
-    // Default is HBaseInterClusterReplicationEndpoint which cannot replicate 
to same cluster
-    if (endpoint == null || !endpoint.canReplicateToSameCluster()) {
-      checkClusterId(peerConfig.getClusterKey());
-    }
 
     if (peerConfig.replicateAllUserTables()) {
       // If replicate_all flag is true, it means all user tables will be 
replicated to peer cluster.
@@ -366,25 +357,6 @@ public class ReplicationPeerManager {
     }
   }
 
-  private void checkClusterId(String clusterKey) throws DoNotRetryIOException {
-    String peerClusterId = "";
-    try {
-      // Create the peer cluster config for get peer cluster id
-      Configuration peerConf = HBaseConfiguration.createClusterConf(conf, 
clusterKey);
-      try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this + 
"check-peer-cluster-id", null)) {
-        peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher);
-      }
-    } catch (IOException | KeeperException e) {
-      throw new DoNotRetryIOException("Can't get peerClusterId for 
clusterKey=" + clusterKey, e);
-    }
-    // In rare case, zookeeper setting may be messed up. That leads to the 
incorrect
-    // peerClusterId value, which is the same as the source clusterId
-    if (clusterId.equals(peerClusterId)) {
-      throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey
-        + ", should not replicate to itself for 
HBaseInterClusterReplicationEndpoint");
-    }
-  }
-
   public List<String> getSerialPeerIdsBelongsTo(TableName tableName) {
     return peers.values().stream().filter(p -> p.getPeerConfig().isSerial())
       .filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> 
p.getPeerId())
@@ -395,7 +367,7 @@ public class ReplicationPeerManager {
     return queueStorage;
   }
 
-  public static ReplicationPeerManager create(ZKWatcher zk, Configuration 
conf, String clusterId)
+  public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
       throws ReplicationException {
     ReplicationPeerStorage peerStorage =
       ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
@@ -406,7 +378,7 @@ public class ReplicationPeerManager {
       peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, 
peerConfig));
     }
     return new ReplicationPeerManager(peerStorage,
-      ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers, 
conf, clusterId);
+      ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);
   }
 
   /**
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index c283b5c..bf6ab7c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -524,6 +524,16 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     if (!this.isSourceActive()) {
       return;
     }
+
+    // In rare case, zookeeper setting may be messed up. That leads to the 
incorrect
+    // peerClusterId value, which is the same as the source clusterId
+    if (clusterId.equals(peerClusterId) && 
!replicationEndpoint.canReplicateToSameCluster()) {
+      this.terminate("ClusterId " + clusterId + " is replicating to itself: 
peerClusterId "
+          + peerClusterId + " which is not allowed by ReplicationEndpoint:"
+          + replicationEndpoint.getClass().getName(), null, false);
+      this.manager.removeSource(this);
+      return;
+    }
     LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer 
cluster: {};",
       logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, 
peerClusterId);
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
index 966b119..b68ca01 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
@@ -71,9 +71,9 @@ public class TestAsyncReplicationAdminApi extends 
TestAsyncAdminBase {
     HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class);
 
   private final String ID_ONE = "1";
-  private static String KEY_ONE;
+  private final String KEY_ONE = "127.0.0.1:2181:/hbase";
   private final String ID_TWO = "2";
-  private static String KEY_TWO;
+  private final String KEY_TWO = "127.0.0.1:2181:/hbase2";
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
@@ -82,8 +82,6 @@ public class TestAsyncReplicationAdminApi extends 
TestAsyncAdminBase {
     
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
     TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
     TEST_UTIL.startMiniCluster();
-    KEY_ONE = TEST_UTIL.getClusterKey() + "-test1";
-    KEY_TWO = TEST_UTIL.getClusterKey() + "-test2";
     ASYNC_CONN = 
ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
index 8c5c78c..04cf392 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
@@ -113,11 +113,6 @@ public class SerialReplicationTestBase {
     protected void doStop() {
       notifyStopped();
     }
-
-    @Override
-    public boolean canReplicateToSameCluster() {
-      return true;
-    }
   }
 
   @BeforeClass
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 3be66e5..37ca7dc 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.Random;
@@ -33,14 +34,17 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.ClusterMetrics;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerMetrics;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -68,7 +72,9 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.HFileTestUtil;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -170,16 +176,40 @@ public class TestMasterReplication {
 
   /**
    * Tests the replication scenario 0 -> 0. By default
-   * {@link 
org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint},
-   * the replication peer should not be added.
+   * {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns 
false, so the
+   * ReplicationSource should terminate, and no further logs should get 
enqueued
    */
-  @Test(expected = DoNotRetryIOException.class)
-  public void testLoopedReplication()
-    throws Exception {
+  @Test
+  public void testLoopedReplication() throws Exception {
     LOG.info("testLoopedReplication");
     startMiniClusters(1);
     createTableOnClusters(table);
     addPeer("1", 0, 0);
+    Thread.sleep(SLEEP_TIME);
+
+    // wait for source to terminate
+    final ServerName rsName = 
utilities[0].getHBaseCluster().getRegionServer(0).getServerName();
+    Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate<Exception>() 
{
+      @Override
+      public boolean evaluate() throws Exception {
+        ClusterMetrics clusterStatus = utilities[0].getAdmin()
+            .getClusterMetrics(EnumSet.of(ClusterMetrics.Option.LIVE_SERVERS));
+        ServerMetrics serverLoad = 
clusterStatus.getLiveServerMetrics().get(rsName);
+        List<ReplicationLoadSource> replicationLoadSourceList =
+            serverLoad.getReplicationLoadSourceList();
+        return replicationLoadSourceList.isEmpty();
+      }
+    });
+
+    Table[] htables = getHTablesOnClusters(tableName);
+    putAndWait(row, famName, htables[0], htables[0]);
+    rollWALAndWait(utilities[0], table.getTableName(), row);
+    ZKWatcher zkw = utilities[0].getZooKeeperWatcher();
+    String queuesZnode = ZNodePaths.joinZNode(zkw.getZNodePaths().baseZNode,
+      ZNodePaths.joinZNode("replication", "rs"));
+    List<String> listChildrenNoWatch =
+        ZKUtil.listChildrenNoWatch(zkw, ZNodePaths.joinZNode(queuesZnode, 
rsName.toString()));
+    assertEquals(0, listChildrenNoWatch.size());
   }
 
   /**
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 6fb24e0..642139c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -500,11 +500,6 @@ public class TestReplicationEndpoint extends 
TestReplicationBase {
       stoppedCount.incrementAndGet();
       notifyStopped();
     }
-
-    @Override
-    public boolean canReplicateToSameCluster() {
-      return true;
-    }
   }
 
   public static class InterClusterReplicationEndpointForTest
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
index 522fb20..bd800a8 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
@@ -127,11 +127,6 @@ public class TestRaceWhenCreatingReplicationSource {
     protected void doStop() {
       notifyStopped();
     }
-
-    @Override
-    public boolean canReplicateToSameCluster() {
-      return true;
-    }
   }
 
   @BeforeClass
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java
index 271609d..375f2ed 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java
@@ -174,9 +174,8 @@ public class TestHBaseFsckCleanReplicationBarriers {
   }
 
   public static void createPeer() throws IOException {
-    ReplicationPeerConfig rpc =
-      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + 
"-test")
-        .setSerial(true).build();
+    ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
+        .setClusterKey(UTIL.getClusterKey()).setSerial(true).build();
     UTIL.getAdmin().addReplicationPeer(PEER_1, rpc);
     UTIL.getAdmin().addReplicationPeer(PEER_2, rpc);
   }

Reply via email to