HBASE-19623 Create replication endpoint asynchronously when adding a 
replication source


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3be13975
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3be13975
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3be13975

Branch: refs/heads/HBASE-19397-branch-2
Commit: 3be13975aeda434cc4816f7c10ca30213a58994b
Parents: 7bb1768
Author: zhangduo <zhang...@apache.org>
Authored: Tue Jan 2 13:25:58 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Sun Feb 4 20:42:08 2018 +0800

----------------------------------------------------------------------
 .../hbase/replication/ReplicationPeer.java      |   8 ++
 .../hbase/replication/ReplicationPeers.java     |  18 +--
 .../replication/ZKReplicationPeerStorage.java   |   7 +-
 .../replication/TestReplicationStateBasic.java  |  20 +---
 .../TestZKReplicationPeerStorage.java           |  14 +--
 .../HBaseInterClusterReplicationEndpoint.java   |  17 ++-
 .../RecoveredReplicationSource.java             |  13 +--
 .../regionserver/ReplicationSource.java         | 110 +++++++++++--------
 .../ReplicationSourceInterface.java             |   8 +-
 .../regionserver/ReplicationSourceManager.java  |  47 +-------
 .../client/TestAsyncReplicationAdminApi.java    |   2 -
 .../replication/TestReplicationAdmin.java       |   2 -
 .../replication/ReplicationSourceDummy.java     |   7 +-
 .../replication/TestReplicationSource.java      |   5 +-
 .../TestReplicationSourceManager.java           |   8 +-
 15 files changed, 116 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3be13975/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
index 4846018..2da3cce 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
@@ -54,6 +54,14 @@ public interface ReplicationPeer {
   PeerState getPeerState();
 
   /**
+   * Test whether the peer is enabled.
+   * @return {@code true} if enabled, otherwise {@code false}.
+   */
+  default boolean isPeerEnabled() {
+    return getPeerState() == PeerState.ENABLED;
+  }
+
+  /**
    * Get the peer config object
    * @return the ReplicationPeerConfig for this peer
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/3be13975/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index 422801b..45940a5 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.replication;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -86,21 +87,6 @@ public class ReplicationPeers {
   }
 
   /**
-   * Get the peer state for the specified connected remote slave cluster. The 
value might be read
-   * from cache, so it is recommended to use {@link #peerStorage } to read 
storage directly if
-   * reading the state after enabling or disabling it.
-   * @param peerId a short that identifies the cluster
-   * @return true if replication is enabled, false otherwise.
-   */
-  public boolean isPeerEnabled(String peerId) {
-    ReplicationPeer replicationPeer = this.peerCache.get(peerId);
-    if (replicationPeer == null) {
-      throw new IllegalArgumentException("Peer with id= " + peerId + " is not 
cached");
-    }
-    return replicationPeer.getPeerState() == PeerState.ENABLED;
-  }
-
-  /**
    * Returns the ReplicationPeerImpl for the specified cached peer. This 
ReplicationPeer will
    * continue to track changes to the Peer's state and config. This method 
returns null if no peer
    * has been cached with the given peerId.
@@ -117,7 +103,7 @@ public class ReplicationPeers {
    * @return a Set of Strings for peerIds
    */
   public Set<String> getAllPeerIds() {
-    return peerCache.keySet();
+    return Collections.unmodifiableSet(peerCache.keySet());
   }
 
   public static Configuration 
getPeerClusterConfiguration(ReplicationPeerConfig peerConfig,

http://git-wip-us.apache.org/repos/asf/hbase/blob/3be13975/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
index bf448e8..42d4b3f 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication;
 
 import java.util.Arrays;
 import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
@@ -30,8 +29,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
 
@@ -41,8 +38,6 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
 @InterfaceAudience.Private
 class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements 
ReplicationPeerStorage {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(ZKReplicationPeerStorage.class);
-
   public static final byte[] ENABLED_ZNODE_BYTES =
     toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
   public static final byte[] DISABLED_ZNODE_BYTES =
@@ -126,7 +121,7 @@ class ZKReplicationPeerStorage extends 
ZKReplicationStorageBase implements Repli
   @Override
   public List<String> listPeerIds() throws ReplicationException {
     try {
-      return 
CollectionUtils.nullToEmpty(ZKUtil.listChildrenAndWatchThem(zookeeper, 
peersZNode));
+      return CollectionUtils.nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, 
peersZNode));
     } catch (KeeperException e) {
       throw new ReplicationException("Cannot get the list of peers", e);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3be13975/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index 07c6c15..f3eeccc 100644
--- 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -225,11 +225,6 @@ public abstract class TestReplicationStateBasic {
       fail("Should have thrown an IllegalArgumentException when passed a bogus 
peerId");
     } catch (ReplicationException e) {
     }
-    try {
-      rp.isPeerEnabled("bogus");
-      fail("Should have thrown an IllegalArgumentException when passed a bogus 
peerId");
-    } catch (IllegalArgumentException e) {
-    }
 
     try {
       assertFalse(rp.addPeer("bogus"));
@@ -245,12 +240,6 @@ public abstract class TestReplicationStateBasic {
     rp.getPeerStorage().addPeer(ID_TWO, new 
ReplicationPeerConfig().setClusterKey(KEY_TWO), true);
     assertNumberOfPeers(2);
 
-    // Test methods with a peer that is added but not connected
-    try {
-      rp.isPeerEnabled(ID_ONE);
-      fail("There are no connected peers, should have thrown an 
IllegalArgumentException");
-    } catch (IllegalArgumentException e) {
-    }
     assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationPeers
         
.getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), 
rp.getConf())));
     rp.getPeerStorage().removePeer(ID_ONE);
@@ -261,7 +250,7 @@ public abstract class TestReplicationStateBasic {
     rp.getPeerStorage().addPeer(ID_ONE, new 
ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
     rp.addPeer(ID_ONE);
     assertNumberOfPeers(2);
-    assertTrue(rp.isPeerEnabled(ID_ONE));
+    assertTrue(rp.getPeer(ID_ONE).isPeerEnabled());
     rp.getPeerStorage().setPeerState(ID_ONE, false);
     // now we do not rely on zk watcher to trigger the state change so we need 
to trigger it
     // manually...
@@ -279,11 +268,6 @@ public abstract class TestReplicationStateBasic {
     // Disconnect peer
     rp.removePeer(ID_ONE);
     assertNumberOfPeers(2);
-    try {
-      rp.isPeerEnabled(ID_ONE);
-      fail("There are no connected peers, should have thrown an 
IllegalArgumentException");
-    } catch (IllegalArgumentException e) {
-    }
   }
 
   protected void assertConnectedPeerStatus(boolean status, String peerId) 
throws Exception {
@@ -292,7 +276,7 @@ public abstract class TestReplicationStateBasic {
       fail("ConnectedPeerStatus was " + !status + " but expected " + status + 
" in ZK");
     }
     while (true) {
-      if (status == rp.isPeerEnabled(peerId)) {
+      if (status == rp.getPeer(peerId).isPeerEnabled()) {
         return;
       }
       if (zkTimeoutCount < ZK_MAX_COUNT) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/3be13975/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
index e8098c8..3eb11da 100644
--- 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
+++ 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
@@ -80,15 +80,11 @@ public class TestZKReplicationPeerStorage {
 
   private ReplicationPeerConfig getConfig(int seed) {
     Random rand = new Random(seed);
-    ReplicationPeerConfig config = new ReplicationPeerConfig();
-    config.setClusterKey(Long.toHexString(rand.nextLong()));
-    config.setReplicationEndpointImpl(Long.toHexString(rand.nextLong()));
-    config.setNamespaces(randNamespaces(rand));
-    config.setExcludeNamespaces(randNamespaces(rand));
-    config.setTableCFsMap(randTableCFs(rand));
-    config.setReplicateAllUserTables(rand.nextBoolean());
-    config.setBandwidth(rand.nextInt(1000));
-    return config;
+    return 
ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(rand.nextLong()))
+        .setReplicationEndpointImpl(Long.toHexString(rand.nextLong()))
+        
.setNamespaces(randNamespaces(rand)).setExcludeNamespaces(randNamespaces(rand))
+        
.setTableCFsMap(randTableCFs(rand)).setReplicateAllUserTables(rand.nextBoolean())
+        .setBandwidth(rand.nextInt(1000)).build();
   }
 
   private void assertSetEquals(Set<String> expected, Set<String> actual) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/3be13975/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 5467de0..fd3c671 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.hbase.replication.regionserver;
 
-import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.SocketTimeoutException;
@@ -39,7 +37,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -48,22 +45,24 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
 import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
 
 /**
  * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint}
@@ -416,7 +415,7 @@ public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoi
   }
 
   protected boolean isPeerEnabled() {
-    return ctx.getReplicationPeer().getPeerState() == PeerState.ENABLED;
+    return ctx.getReplicationPeer().isPeerEnabled();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/3be13975/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index e0c45d5..7bceb78 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -28,8 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Threads;
@@ -51,11 +50,11 @@ public class RecoveredReplicationSource extends 
ReplicationSource {
 
   @Override
   public void init(Configuration conf, FileSystem fs, ReplicationSourceManager 
manager,
-      ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, 
Server server,
-      String peerClusterZnode, UUID clusterId, ReplicationEndpoint 
replicationEndpoint,
-      WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) 
throws IOException {
-    super.init(conf, fs, manager, queueStorage, replicationPeers, server, 
peerClusterZnode,
-      clusterId, replicationEndpoint, walFileLengthProvider, metrics);
+      ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, 
Server server,
+      String peerClusterZnode, UUID clusterId, WALFileLengthProvider 
walFileLengthProvider,
+      MetricsSource metrics) throws IOException {
+    super.init(conf, fs, manager, queueStorage, replicationPeer, server, 
peerClusterZnode,
+      clusterId, walFileLengthProvider, metrics);
     this.actualPeerId = this.replicationQueueInfo.getPeerId();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/3be13975/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 8250992..ffed88d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -38,14 +38,16 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
 import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
-import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
@@ -82,7 +84,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
   // per group queue size, keep no more than this number of logs in each wal 
group
   protected int queueSizePerGroup;
   protected ReplicationQueueStorage queueStorage;
-  private ReplicationPeers replicationPeers;
+  private ReplicationPeer replicationPeer;
 
   protected Configuration conf;
   protected ReplicationQueueInfo replicationQueueInfo;
@@ -110,8 +112,10 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
   private volatile boolean sourceRunning = false;
   // Metrics for this source
   private MetricsSource metrics;
-  //WARN threshold for the number of queued logs, defaults to 2
+  // WARN threshold for the number of queued logs, defaults to 2
   private int logQueueWarnThreshold;
+  // whether the replication endpoint has been initialized
+  private volatile boolean endpointInitialized = false;
   // ReplicationEndpoint which will handle the actual replication
   private ReplicationEndpoint replicationEndpoint;
   // A filter (or a chain of filters) for the WAL entries.
@@ -133,22 +137,19 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
 
   /**
    * Instantiation method used by region servers
-   *
    * @param conf configuration to use
    * @param fs file system to use
    * @param manager replication manager to ping to
    * @param server the server for this region server
    * @param peerClusterZnode the name of our znode
    * @param clusterId unique UUID for the cluster
-   * @param replicationEndpoint the replication endpoint implementation
    * @param metrics metrics for replication source
-   * @throws IOException
    */
   @Override
   public void init(Configuration conf, FileSystem fs, ReplicationSourceManager 
manager,
-      ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, 
Server server,
-      String peerClusterZnode, UUID clusterId, ReplicationEndpoint 
replicationEndpoint,
-      WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) 
throws IOException {
+      ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, 
Server server,
+      String peerClusterZnode, UUID clusterId, WALFileLengthProvider 
walFileLengthProvider,
+      MetricsSource metrics) throws IOException {
     this.server = server;
     this.conf = HBaseConfiguration.create(conf);
     this.waitOnEndpointSeconds =
@@ -160,7 +161,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 
minutes @ 1 sec per
     this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 
32);
     this.queueStorage = queueStorage;
-    this.replicationPeers = replicationPeers;
+    this.replicationPeer = replicationPeer;
     this.manager = manager;
     this.fs = fs;
     this.metrics = metrics;
@@ -171,7 +172,6 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     // ReplicationQueueInfo parses the peerId out of the znode for us
     this.peerId = this.replicationQueueInfo.getPeerId();
     this.logQueueWarnThreshold = 
this.conf.getInt("replication.source.log.queue.warn", 2);
-    this.replicationEndpoint = replicationEndpoint;
 
     defaultBandwidth = 
this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
     currentBandwidth = getCurrentBandwidth();
@@ -196,7 +196,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     if (queue == null) {
       queue = new PriorityBlockingQueue<>(queueSizePerGroup, new 
LogsComparator());
       queues.put(logPrefix, queue);
-      if (this.sourceRunning) {
+      if (this.isSourceActive() && this.endpointInitialized) {
         // new wal group observed after source startup, start a new worker 
thread to track it
         // notice: it's possible that log enqueued when this.running is set 
but worker thread
         // still not launched, so it's necessary to check workerThreads before 
start the worker
@@ -222,7 +222,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
       // A peerId will not have "-" in its name, see HBASE-11394
       peerId = peerClusterZnode.split("-")[0];
     }
-    Map<TableName, List<String>> tableCFMap = 
replicationPeers.getPeer(peerId).getTableCFs();
+    Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
     if (tableCFMap != null) {
       List<String> tableCfs = tableCFMap.get(tableName);
       if (tableCFMap.containsKey(tableName)
@@ -241,21 +241,59 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     }
   }
 
+  private void initAndStartReplicationEndpoint() throws Exception {
+    RegionServerCoprocessorHost rsServerHost = null;
+    TableDescriptors tableDescriptors = null;
+    if (server instanceof HRegionServer) {
+      rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
+      tableDescriptors = ((HRegionServer) server).getTableDescriptors();
+    }
+    String replicationEndpointImpl = 
replicationPeer.getPeerConfig().getReplicationEndpointImpl();
+    if (replicationEndpointImpl == null) {
+      // Default to HBase inter-cluster replication endpoint
+      replicationEndpointImpl = 
HBaseInterClusterReplicationEndpoint.class.getName();
+    }
+    replicationEndpoint =
+        
Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class).newInstance();
+    if (rsServerHost != null) {
+      ReplicationEndpoint newReplicationEndPoint =
+          rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
+      if (newReplicationEndPoint != null) {
+        // Override the newly created endpoint from the hook with configured 
end point
+        replicationEndpoint = newReplicationEndPoint;
+      }
+    }
+    replicationEndpoint
+        .init(new ReplicationEndpoint.Context(conf, 
replicationPeer.getConfiguration(), fs, peerId,
+            clusterId, replicationPeer, metrics, tableDescriptors, server));
+    replicationEndpoint.start();
+    replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
+  }
+
   @Override
   public void run() {
     // mark we are running now
     this.sourceRunning = true;
-    try {
-      // start the endpoint, connect to the cluster
-      this.replicationEndpoint.start();
-      this.replicationEndpoint.awaitRunning(this.waitOnEndpointSeconds, 
TimeUnit.SECONDS);
-    } catch (Exception ex) {
-      LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
-      uninitialize();
-      throw new RuntimeException(ex);
-    }
 
     int sleepMultiplier = 1;
+    while (this.isSourceActive()) {
+      try {
+        initAndStartReplicationEndpoint();
+        break;
+      } catch (Exception e) {
+        LOG.warn("Error starting ReplicationEndpoint, retrying", e);
+        if (replicationEndpoint != null) {
+          replicationEndpoint.stop();
+          replicationEndpoint = null;
+        }
+        if (sleepForRetries("Error starting ReplicationEndpoint", 
sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+      }
+    }
+    this.endpointInitialized = true;
+
+    sleepMultiplier = 1;
     // delay this until we are in an asynchronous thread
     while (this.isSourceActive() && this.peerClusterId == null) {
       this.peerClusterId = replicationEndpoint.getPeerUUID();
@@ -288,8 +326,8 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
 
   private void initializeWALEntryFilter() {
     // get the WALEntryFilter from ReplicationEndpoint and add it to default 
filters
-    ArrayList<WALEntryFilter> filters = Lists.newArrayList(
-      (WALEntryFilter)new SystemTableWALEntryFilter());
+    ArrayList<WALEntryFilter> filters =
+      Lists.<WALEntryFilter> newArrayList(new SystemTableWALEntryFilter());
     WALEntryFilter filterFromEndpoint = 
this.replicationEndpoint.getWALEntryfilter();
     if (filterFromEndpoint != null) {
       filters.add(filterFromEndpoint);
@@ -309,7 +347,6 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
       worker.startup(getUncaughtExceptionHandler());
       worker.setWALReader(startNewWALReader(worker.getName(), walGroupId, 
queue,
         worker.getStartPosition()));
-      workerThreads.put(walGroupId, worker);
     }
   }
 
@@ -370,25 +407,11 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
   }
 
   private long getCurrentBandwidth() {
-    ReplicationPeer replicationPeer = this.replicationPeers.getPeer(peerId);
-    long peerBandwidth = replicationPeer != null ? 
replicationPeer.getPeerBandwidth() : 0;
+    long peerBandwidth = replicationPeer.getPeerBandwidth();
     // user can set peer bandwidth to 0 to use default bandwidth
     return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
   }
 
-  private void uninitialize() {
-    LOG.debug("Source exiting " + this.peerId);
-    metrics.clear();
-    if (this.replicationEndpoint.isRunning() || 
this.replicationEndpoint.isStarting()) {
-      this.replicationEndpoint.stop();
-      try {
-        this.replicationEndpoint.awaitTerminated(this.waitOnEndpointSeconds, 
TimeUnit.SECONDS);
-      } catch (TimeoutException e) {
-        LOG.warn("Failed termination after " + this.waitOnEndpointSeconds + " 
seconds.");
-      }
-    }
-  }
-
   /**
    * Do the sleeping logic
    * @param msg Why we sleep
@@ -410,12 +433,11 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
 
   /**
    * check whether the peer is enabled or not
-   *
    * @return true if the peer is enabled, otherwise false
    */
   @Override
   public boolean isPeerEnabled() {
-    return this.replicationPeers.isPeerEnabled(this.peerId);
+    return replicationPeer.isPeerEnabled();
   }
 
   @Override
@@ -427,8 +449,8 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
         LOG.error("Unexpected exception in ReplicationSource", e);
       }
     };
-    Threads
-        .setDaemonThreadRunning(this, n + ".replicationSource," + 
this.peerClusterZnode, handler);
+    Threads.setDaemonThreadRunning(this, n + ".replicationSource," + 
this.peerClusterZnode,
+      handler);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/3be13975/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 4b9ed74..4f10c73 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -50,9 +50,9 @@ public interface ReplicationSourceInterface {
    * @param server the server for this region server
    */
   void init(Configuration conf, FileSystem fs, ReplicationSourceManager 
manager,
-      ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, 
Server server,
-      String peerClusterZnode, UUID clusterId, ReplicationEndpoint 
replicationEndpoint,
-      WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) 
throws IOException;
+      ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, 
Server server,
+      String peerClusterZnode, UUID clusterId, WALFileLengthProvider 
walFileLengthProvider,
+      MetricsSource metrics) throws IOException;
 
   /**
    * Add a log to the list of logs to replicate

http://git-wip-us.apache.org/repos/asf/hbase/blob/3be13975/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index bd0ec77..91ed98c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -49,13 +49,9 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
-import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationListener;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
@@ -501,49 +497,14 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    * @param peerId the id of the peer cluster
    * @return the created source
    */
-  private ReplicationSourceInterface getReplicationSource(String peerId, 
ReplicationPeer peer)
-      throws IOException {
-    RegionServerCoprocessorHost rsServerHost = null;
-    TableDescriptors tableDescriptors = null;
-    if (server instanceof HRegionServer) {
-      rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
-      tableDescriptors = ((HRegionServer) server).getTableDescriptors();
-    }
-
+  private ReplicationSourceInterface getReplicationSource(String peerId,
+      ReplicationPeer replicationPeer) throws IOException {
     ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, 
peerId);
 
-    ReplicationEndpoint replicationEndpoint = null;
-    try {
-      String replicationEndpointImpl = 
peer.getPeerConfig().getReplicationEndpointImpl();
-      if (replicationEndpointImpl == null) {
-        // Default to HBase inter-cluster replication endpoint
-        replicationEndpointImpl = 
HBaseInterClusterReplicationEndpoint.class.getName();
-      }
-      replicationEndpoint = Class.forName(replicationEndpointImpl)
-          .asSubclass(ReplicationEndpoint.class).newInstance();
-      if (rsServerHost != null) {
-        ReplicationEndpoint newReplicationEndPoint =
-            rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
-        if (newReplicationEndPoint != null) {
-          // Override the newly created endpoint from the hook with configured 
end point
-          replicationEndpoint = newReplicationEndPoint;
-        }
-      }
-    } catch (Exception e) {
-      LOG.warn("Passed replication endpoint implementation throws errors" +
-        " while initializing ReplicationSource for peer: " + peerId, e);
-      throw new IOException(e);
-    }
-
     MetricsSource metrics = new MetricsSource(peerId);
     // init replication source
-    src.init(conf, fs, this, queueStorage, replicationPeers, server, peerId, 
clusterId,
-      replicationEndpoint, walFileLengthProvider, metrics);
-
-    // init replication endpoint
-    replicationEndpoint.init(new ReplicationEndpoint.Context(conf, 
peer.getConfiguration(), fs,
-        peerId, clusterId, peer, metrics, tableDescriptors, server));
-
+    src.init(conf, fs, this, queueStorage, replicationPeer, server, peerId, 
clusterId,
+      walFileLengthProvider, metrics);
     return src;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/3be13975/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
index cc57dfb..b5a50c0 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
@@ -44,7 +44,6 @@ import 
org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
 import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -75,7 +74,6 @@ public class TestAsyncReplicationAdminApi extends 
TestAsyncAdminBase {
     
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 
120000);
     
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
     TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
-    TEST_UTIL.getConfiguration().setInt(ReadOnlyZKClient.RECOVERY_RETRY, 1);
     TEST_UTIL.startMiniCluster();
     ASYNC_CONN = 
ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3be13975/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index e38b9bd..772a9d6 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -50,7 +50,6 @@ import 
org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterR
 import 
org.apache.hadoop.hbase.replication.TestReplicationEndpoint.ReplicationEndpointForTest;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -90,7 +89,6 @@ public class TestReplicationAdmin {
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
-    TEST_UTIL.getConfiguration().setInt(ReadOnlyZKClient.RECOVERY_RETRY, 1);
     TEST_UTIL.startMiniCluster();
     admin = new ReplicationAdmin(TEST_UTIL.getConfiguration());
     hbaseAdmin = TEST_UTIL.getAdmin();

http://git-wip-us.apache.org/repos/asf/hbase/blob/3be13975/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 14c5e56..38ec598 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -48,9 +49,9 @@ public class ReplicationSourceDummy implements 
ReplicationSourceInterface {
 
   @Override
   public void init(Configuration conf, FileSystem fs, ReplicationSourceManager 
manager,
-      ReplicationQueueStorage rq, ReplicationPeers rp, Server server, String 
peerClusterId,
-      UUID clusterId, ReplicationEndpoint replicationEndpoint,
-      WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) 
throws IOException {
+      ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String 
peerClusterId,
+      UUID clusterId, WALFileLengthProvider walFileLengthProvider, 
MetricsSource metrics)
+      throws IOException {
     this.manager = manager;
     this.peerClusterId = peerClusterId;
     this.metrics = metrics;

http://git-wip-us.apache.org/repos/asf/hbase/blob/3be13975/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index ed181dd..b98ca7f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -162,15 +162,14 @@ public class TestReplicationSource {
       }
     };
     replicationEndpoint.start();
-    ReplicationPeers mockPeers = Mockito.mock(ReplicationPeers.class);
     ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
     Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
     Configuration testConf = HBaseConfiguration.create();
     testConf.setInt("replication.source.maxretriesmultiplier", 1);
     ReplicationSourceManager manager = 
Mockito.mock(ReplicationSourceManager.class);
     Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
-    source.init(testConf, null, manager, null, mockPeers, null, "testPeer", 
null,
-      replicationEndpoint, p -> OptionalLong.empty(), null);
+    source.init(testConf, null, manager, null, mockPeer, null, "testPeer", 
null,
+      p -> OptionalLong.empty(), null);
     ExecutorService executor = Executors.newSingleThreadExecutor();
     Future<?> future = executor.submit(new Runnable() {
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/3be13975/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 77b2fb2..ffa889a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -64,8 +64,8 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
-import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
@@ -736,9 +736,9 @@ public abstract class TestReplicationSourceManager {
 
     @Override
     public void init(Configuration conf, FileSystem fs, 
ReplicationSourceManager manager,
-        ReplicationQueueStorage rq, ReplicationPeers rp, Server server, String 
peerClusterId,
-        UUID clusterId, ReplicationEndpoint replicationEndpoint,
-        WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) 
throws IOException {
+        ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String 
peerClusterId,
+        UUID clusterId, WALFileLengthProvider walFileLengthProvider, 
MetricsSource metrics)
+        throws IOException {
       throw new IOException("Failing deliberately");
     }
   }

Reply via email to