HBASE-19707 Race in start and terminate of a replication source after we async 
start replicatione endpoint


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/85693bc2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/85693bc2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/85693bc2

Branch: refs/heads/HBASE-19397-branch-2
Commit: 85693bc2874f1c316f110a2ff9b5325649377681
Parents: 789fe92
Author: zhangduo <zhang...@apache.org>
Authored: Fri Jan 5 18:28:44 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Tue Jan 30 09:29:52 2018 +0800

----------------------------------------------------------------------
 .../RecoveredReplicationSource.java             |  16 +-
 .../regionserver/ReplicationSource.java         | 203 ++++++++++---------
 2 files changed, 116 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/85693bc2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 1be9a88..3cae0f2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -68,7 +68,7 @@ public class RecoveredReplicationSource extends 
ReplicationSource {
       LOG.debug("Someone has beat us to start a worker thread for wal group " 
+ walGroupId);
     } else {
       LOG.debug("Starting up worker for wal group " + walGroupId);
-      worker.startup(getUncaughtExceptionHandler());
+      worker.startup(this::uncaughtException);
       worker.setWALReader(
         startNewWALReader(worker.getName(), walGroupId, queue, 
worker.getStartPosition()));
       workerThreads.put(walGroupId, worker);
@@ -76,13 +76,13 @@ public class RecoveredReplicationSource extends 
ReplicationSource {
   }
 
   @Override
-  protected ReplicationSourceWALReader startNewWALReader(String threadName,
-      String walGroupId, PriorityBlockingQueue<Path> queue, long 
startPosition) {
-    ReplicationSourceWALReader walReader = new 
RecoveredReplicationSourceWALReader(fs,
-        conf, queue, startPosition, walEntryFilter, this);
-    Threads.setDaemonThreadRunning(walReader, threadName
-        + ".replicationSource.replicationWALReaderThread." + walGroupId + "," 
+ queueId,
-      getUncaughtExceptionHandler());
+  protected ReplicationSourceWALReader startNewWALReader(String threadName, 
String walGroupId,
+      PriorityBlockingQueue<Path> queue, long startPosition) {
+    ReplicationSourceWALReader walReader =
+      new RecoveredReplicationSourceWALReader(fs, conf, queue, startPosition, 
walEntryFilter, this);
+    Threads.setDaemonThreadRunning(walReader,
+      threadName + ".replicationSource.replicationWALReaderThread." + 
walGroupId + "," + queueId,
+      this::uncaughtException);
     return walReader;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/85693bc2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 0092251..09b6cc1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -75,7 +75,7 @@ import 
org.apache.hbase.thirdparty.com.google.common.collect.Lists;
  * </p>
  */
 @InterfaceAudience.Private
-public class ReplicationSource extends Thread implements 
ReplicationSourceInterface {
+public class ReplicationSource implements ReplicationSourceInterface {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationSource.class);
   // Queues of logs to process, entry in format of walGroupId->queue,
@@ -114,10 +114,8 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
   private MetricsSource metrics;
   // WARN threshold for the number of queued logs, defaults to 2
   private int logQueueWarnThreshold;
-  // whether the replication endpoint has been initialized
-  private volatile boolean endpointInitialized = false;
   // ReplicationEndpoint which will handle the actual replication
-  private ReplicationEndpoint replicationEndpoint;
+  private volatile ReplicationEndpoint replicationEndpoint;
   // A filter (or a chain of filters) for the WAL entries.
   protected WALEntryFilter walEntryFilter;
   // throttler
@@ -135,6 +133,8 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
   public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
   private int waitOnEndpointSeconds = -1;
 
+  private Thread initThread;
+
   /**
    * Instantiation method used by region servers
    * @param conf configuration to use
@@ -196,7 +196,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     if (queue == null) {
       queue = new PriorityBlockingQueue<>(queueSizePerGroup, new 
LogsComparator());
       queues.put(logPrefix, queue);
-      if (this.isSourceActive() && this.endpointInitialized) {
+      if (this.isSourceActive() && this.replicationEndpoint != null) {
         // new wal group observed after source startup, start a new worker 
thread to track it
         // notice: it's possible that log enqueued when this.running is set 
but worker thread
         // still not launched, so it's necessary to check workerThreads before 
start the worker
@@ -235,28 +235,36 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     }
   }
 
-  private void initAndStartReplicationEndpoint() throws Exception {
+  private ReplicationEndpoint createReplicationEndpoint()
+      throws InstantiationException, IllegalAccessException, 
ClassNotFoundException, IOException {
     RegionServerCoprocessorHost rsServerHost = null;
-    TableDescriptors tableDescriptors = null;
     if (server instanceof HRegionServer) {
       rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
-      tableDescriptors = ((HRegionServer) server).getTableDescriptors();
     }
     String replicationEndpointImpl = 
replicationPeer.getPeerConfig().getReplicationEndpointImpl();
     if (replicationEndpointImpl == null) {
       // Default to HBase inter-cluster replication endpoint
       replicationEndpointImpl = 
HBaseInterClusterReplicationEndpoint.class.getName();
     }
-    replicationEndpoint =
-        
Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class).newInstance();
+    ReplicationEndpoint replicationEndpoint =
+      
Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class).newInstance();
     if (rsServerHost != null) {
       ReplicationEndpoint newReplicationEndPoint =
-          rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
+        rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
       if (newReplicationEndPoint != null) {
         // Override the newly created endpoint from the hook with configured 
end point
         replicationEndpoint = newReplicationEndPoint;
       }
     }
+    return replicationEndpoint;
+  }
+
+  private void initAndStartReplicationEndpoint(ReplicationEndpoint 
replicationEndpoint)
+      throws IOException, TimeoutException {
+    TableDescriptors tableDescriptors = null;
+    if (server instanceof HRegionServer) {
+      tableDescriptors = ((HRegionServer) server).getTableDescriptors();
+    }
     replicationEndpoint
         .init(new ReplicationEndpoint.Context(conf, 
replicationPeer.getConfiguration(), fs, peerId,
             clusterId, replicationPeer, metrics, tableDescriptors, server));
@@ -264,60 +272,6 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
   }
 
-  @Override
-  public void run() {
-    // mark we are running now
-    this.sourceRunning = true;
-
-    int sleepMultiplier = 1;
-    while (this.isSourceActive()) {
-      try {
-        initAndStartReplicationEndpoint();
-        break;
-      } catch (Exception e) {
-        LOG.warn("Error starting ReplicationEndpoint, retrying", e);
-        if (replicationEndpoint != null) {
-          replicationEndpoint.stop();
-          replicationEndpoint = null;
-        }
-        if (sleepForRetries("Error starting ReplicationEndpoint", 
sleepMultiplier)) {
-          sleepMultiplier++;
-        }
-      }
-    }
-    this.endpointInitialized = true;
-
-    sleepMultiplier = 1;
-    // delay this until we are in an asynchronous thread
-    while (this.isSourceActive() && this.peerClusterId == null) {
-      this.peerClusterId = replicationEndpoint.getPeerUUID();
-      if (this.isSourceActive() && this.peerClusterId == null) {
-        if (sleepForRetries("Cannot contact the peer's zk ensemble", 
sleepMultiplier)) {
-          sleepMultiplier++;
-        }
-      }
-    }
-
-    // In rare case, zookeeper setting may be messed up. That leads to the 
incorrect
-    // peerClusterId value, which is the same as the source clusterId
-    if (clusterId.equals(peerClusterId) && 
!replicationEndpoint.canReplicateToSameCluster()) {
-      this.terminate("ClusterId " + clusterId + " is replicating to itself: 
peerClusterId "
-          + peerClusterId + " which is not allowed by ReplicationEndpoint:"
-          + replicationEndpoint.getClass().getName(), null, false);
-      this.manager.removeSource(this);
-      return;
-    }
-    LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
-
-    initializeWALEntryFilter();
-    // start workers
-    for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : 
queues.entrySet()) {
-      String walGroupId = entry.getKey();
-      PriorityBlockingQueue<Path> queue = entry.getValue();
-      tryStartNewShipper(walGroupId, queue);
-    }
-  }
-
   private void initializeWALEntryFilter() {
     // get the WALEntryFilter from ReplicationEndpoint and add it to default 
filters
     ArrayList<WALEntryFilter> filters =
@@ -331,37 +285,31 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
   }
 
   protected void tryStartNewShipper(String walGroupId, 
PriorityBlockingQueue<Path> queue) {
-    final ReplicationSourceShipper worker = new ReplicationSourceShipper(conf,
-        walGroupId, queue, this);
+    ReplicationSourceShipper worker = new ReplicationSourceShipper(conf, 
walGroupId, queue, this);
     ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, 
worker);
     if (extant != null) {
       LOG.debug("Someone has beat us to start a worker thread for wal group " 
+ walGroupId);
     } else {
       LOG.debug("Starting up worker for wal group " + walGroupId);
-      worker.startup(getUncaughtExceptionHandler());
-      worker.setWALReader(startNewWALReader(worker.getName(), walGroupId, 
queue,
-        worker.getStartPosition()));
+      worker.startup(this::uncaughtException);
+      worker.setWALReader(
+        startNewWALReader(worker.getName(), walGroupId, queue, 
worker.getStartPosition()));
     }
   }
 
   protected ReplicationSourceWALReader startNewWALReader(String threadName, 
String walGroupId,
       PriorityBlockingQueue<Path> queue, long startPosition) {
     ReplicationSourceWALReader walReader =
-        new ReplicationSourceWALReader(fs, conf, queue, startPosition, 
walEntryFilter, this);
+      new ReplicationSourceWALReader(fs, conf, queue, startPosition, 
walEntryFilter, this);
     return (ReplicationSourceWALReader) 
Threads.setDaemonThreadRunning(walReader,
       threadName + ".replicationSource.wal-reader." + walGroupId + "," + 
queueId,
-      getUncaughtExceptionHandler());
+      this::uncaughtException);
   }
 
-  public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
-    return new Thread.UncaughtExceptionHandler() {
-      @Override
-      public void uncaughtException(final Thread t, final Throwable e) {
-        RSRpcServices.exitIfOOME(e);
-        LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + 
getCurrentPath(), e);
-        server.stop("Unexpected exception in " + t.getName());
-      }
-    };
+  protected final void uncaughtException(Thread t, Throwable e) {
+    RSRpcServices.exitIfOOME(e);
+    LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + 
getCurrentPath(), e);
+    server.abort("Unexpected exception in " + t.getName(), e);
   }
 
   @Override
@@ -434,17 +382,76 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     return replicationPeer.isPeerEnabled();
   }
 
+  private void initialize() {
+    int sleepMultiplier = 1;
+    while (this.isSourceActive()) {
+      ReplicationEndpoint replicationEndpoint;
+      try {
+        replicationEndpoint = createReplicationEndpoint();
+      } catch (Exception e) {
+        LOG.warn("error creating ReplicationEndpoint, retry", e);
+        if (sleepForRetries("Error creating ReplicationEndpoint", 
sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+        continue;
+      }
+
+      try {
+        initAndStartReplicationEndpoint(replicationEndpoint);
+        this.replicationEndpoint = replicationEndpoint;
+        break;
+      } catch (Exception e) {
+        LOG.warn("Error starting ReplicationEndpoint, retry", e);
+        replicationEndpoint.stop();
+        if (sleepForRetries("Error starting ReplicationEndpoint", 
sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+      }
+    }
+
+    if (!this.isSourceActive()) {
+      return;
+    }
+
+    sleepMultiplier = 1;
+    // delay this until we are in an asynchronous thread
+    while (this.isSourceActive() && this.peerClusterId == null) {
+      this.peerClusterId = replicationEndpoint.getPeerUUID();
+      if (this.isSourceActive() && this.peerClusterId == null) {
+        if (sleepForRetries("Cannot contact the peer's zk ensemble", 
sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+      }
+    }
+
+    // In rare case, zookeeper setting may be messed up. That leads to the 
incorrect
+    // peerClusterId value, which is the same as the source clusterId
+    if (clusterId.equals(peerClusterId) && 
!replicationEndpoint.canReplicateToSameCluster()) {
+      this.terminate("ClusterId " + clusterId + " is replicating to itself: 
peerClusterId "
+          + peerClusterId + " which is not allowed by ReplicationEndpoint:"
+          + replicationEndpoint.getClass().getName(), null, false);
+      this.manager.removeSource(this);
+      return;
+    }
+    LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
+
+    initializeWALEntryFilter();
+    // start workers
+    for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : 
queues.entrySet()) {
+      String walGroupId = entry.getKey();
+      PriorityBlockingQueue<Path> queue = entry.getValue();
+      tryStartNewShipper(walGroupId, queue);
+    }
+  }
+
   @Override
   public void startup() {
-    String n = Thread.currentThread().getName();
-    Thread.UncaughtExceptionHandler handler = new 
Thread.UncaughtExceptionHandler() {
-      @Override
-      public void uncaughtException(final Thread t, final Throwable e) {
-        LOG.error("Unexpected exception in ReplicationSource", e);
-      }
-    };
-    Threads.setDaemonThreadRunning(this, n + ".replicationSource," + 
this.queueId,
-      handler);
+    // mark we are running now
+    this.sourceRunning = true;
+    initThread = new Thread(this::initialize);
+    Threads.setDaemonThreadRunning(initThread,
+      Thread.currentThread().getName() + ".replicationSource," + this.queueId,
+      this::uncaughtException);
   }
 
   @Override
@@ -465,6 +472,13 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
         cause);
     }
     this.sourceRunning = false;
+    if (initThread != null && Thread.currentThread() != initThread) {
+      // This usually won't happen but anyway, let's wait until the 
initialization thread exits.
+      // And notice that we may call terminate directly from the initThread so 
here we need to
+      // avoid join on ourselves.
+      initThread.interrupt();
+      Threads.shutdown(initThread, this.sleepForRetries);
+    }
     Collection<ReplicationSourceShipper> workers = workerThreads.values();
     for (ReplicationSourceShipper worker : workers) {
       worker.stopWorker();
@@ -481,12 +495,11 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
       }
       if (this.replicationEndpoint != null) {
         try {
-          this.replicationEndpoint
-              .awaitTerminated(sleepForRetries * maxRetriesMultiplier, 
TimeUnit.MILLISECONDS);
+          this.replicationEndpoint.awaitTerminated(sleepForRetries * 
maxRetriesMultiplier,
+            TimeUnit.MILLISECONDS);
         } catch (TimeoutException te) {
-          LOG.warn("Got exception while waiting for endpoint to shutdown for 
replication source :"
-              + this.queueId,
-            te);
+          LOG.warn("Got exception while waiting for endpoint to shutdown for 
replication source :" +
+            this.queueId, te);
         }
       }
     }

Reply via email to