HBASE-19707 Race in start and terminate of a replication source after we async start replicatione endpoint
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/85693bc2 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/85693bc2 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/85693bc2 Branch: refs/heads/HBASE-19397-branch-2 Commit: 85693bc2874f1c316f110a2ff9b5325649377681 Parents: 789fe92 Author: zhangduo <zhang...@apache.org> Authored: Fri Jan 5 18:28:44 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Tue Jan 30 09:29:52 2018 +0800 ---------------------------------------------------------------------- .../RecoveredReplicationSource.java | 16 +- .../regionserver/ReplicationSource.java | 203 ++++++++++--------- 2 files changed, 116 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/85693bc2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index 1be9a88..3cae0f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -68,7 +68,7 @@ public class RecoveredReplicationSource extends ReplicationSource { LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); } else { LOG.debug("Starting up worker for wal group " + walGroupId); - worker.startup(getUncaughtExceptionHandler()); + worker.startup(this::uncaughtException); worker.setWALReader( startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition())); workerThreads.put(walGroupId, worker); @@ -76,13 +76,13 @@ public class RecoveredReplicationSource extends ReplicationSource { } @Override - protected ReplicationSourceWALReader startNewWALReader(String threadName, - String walGroupId, PriorityBlockingQueue<Path> queue, long startPosition) { - ReplicationSourceWALReader walReader = new RecoveredReplicationSourceWALReader(fs, - conf, queue, startPosition, walEntryFilter, this); - Threads.setDaemonThreadRunning(walReader, threadName - + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + queueId, - getUncaughtExceptionHandler()); + protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId, + PriorityBlockingQueue<Path> queue, long startPosition) { + ReplicationSourceWALReader walReader = + new RecoveredReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); + Threads.setDaemonThreadRunning(walReader, + threadName + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + queueId, + this::uncaughtException); return walReader; } http://git-wip-us.apache.org/repos/asf/hbase/blob/85693bc2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 0092251..09b6cc1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -75,7 +75,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists; * </p> */ @InterfaceAudience.Private -public class ReplicationSource extends Thread implements ReplicationSourceInterface { +public class ReplicationSource implements ReplicationSourceInterface { private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class); // Queues of logs to process, entry in format of walGroupId->queue, @@ -114,10 +114,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf private MetricsSource metrics; // WARN threshold for the number of queued logs, defaults to 2 private int logQueueWarnThreshold; - // whether the replication endpoint has been initialized - private volatile boolean endpointInitialized = false; // ReplicationEndpoint which will handle the actual replication - private ReplicationEndpoint replicationEndpoint; + private volatile ReplicationEndpoint replicationEndpoint; // A filter (or a chain of filters) for the WAL entries. protected WALEntryFilter walEntryFilter; // throttler @@ -135,6 +133,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30; private int waitOnEndpointSeconds = -1; + private Thread initThread; + /** * Instantiation method used by region servers * @param conf configuration to use @@ -196,7 +196,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf if (queue == null) { queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator()); queues.put(logPrefix, queue); - if (this.isSourceActive() && this.endpointInitialized) { + if (this.isSourceActive() && this.replicationEndpoint != null) { // new wal group observed after source startup, start a new worker thread to track it // notice: it's possible that log enqueued when this.running is set but worker thread // still not launched, so it's necessary to check workerThreads before start the worker @@ -235,28 +235,36 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } } - private void initAndStartReplicationEndpoint() throws Exception { + private ReplicationEndpoint createReplicationEndpoint() + throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException { RegionServerCoprocessorHost rsServerHost = null; - TableDescriptors tableDescriptors = null; if (server instanceof HRegionServer) { rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); - tableDescriptors = ((HRegionServer) server).getTableDescriptors(); } String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl(); if (replicationEndpointImpl == null) { // Default to HBase inter-cluster replication endpoint replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName(); } - replicationEndpoint = - Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class).newInstance(); + ReplicationEndpoint replicationEndpoint = + Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class).newInstance(); if (rsServerHost != null) { ReplicationEndpoint newReplicationEndPoint = - rsServerHost.postCreateReplicationEndPoint(replicationEndpoint); + rsServerHost.postCreateReplicationEndPoint(replicationEndpoint); if (newReplicationEndPoint != null) { // Override the newly created endpoint from the hook with configured end point replicationEndpoint = newReplicationEndPoint; } } + return replicationEndpoint; + } + + private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint) + throws IOException, TimeoutException { + TableDescriptors tableDescriptors = null; + if (server instanceof HRegionServer) { + tableDescriptors = ((HRegionServer) server).getTableDescriptors(); + } replicationEndpoint .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server)); @@ -264,60 +272,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS); } - @Override - public void run() { - // mark we are running now - this.sourceRunning = true; - - int sleepMultiplier = 1; - while (this.isSourceActive()) { - try { - initAndStartReplicationEndpoint(); - break; - } catch (Exception e) { - LOG.warn("Error starting ReplicationEndpoint, retrying", e); - if (replicationEndpoint != null) { - replicationEndpoint.stop(); - replicationEndpoint = null; - } - if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) { - sleepMultiplier++; - } - } - } - this.endpointInitialized = true; - - sleepMultiplier = 1; - // delay this until we are in an asynchronous thread - while (this.isSourceActive() && this.peerClusterId == null) { - this.peerClusterId = replicationEndpoint.getPeerUUID(); - if (this.isSourceActive() && this.peerClusterId == null) { - if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { - sleepMultiplier++; - } - } - } - - // In rare case, zookeeper setting may be messed up. That leads to the incorrect - // peerClusterId value, which is the same as the source clusterId - if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) { - this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId " - + peerClusterId + " which is not allowed by ReplicationEndpoint:" - + replicationEndpoint.getClass().getName(), null, false); - this.manager.removeSource(this); - return; - } - LOG.info("Replicating " + clusterId + " -> " + peerClusterId); - - initializeWALEntryFilter(); - // start workers - for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) { - String walGroupId = entry.getKey(); - PriorityBlockingQueue<Path> queue = entry.getValue(); - tryStartNewShipper(walGroupId, queue); - } - } - private void initializeWALEntryFilter() { // get the WALEntryFilter from ReplicationEndpoint and add it to default filters ArrayList<WALEntryFilter> filters = @@ -331,37 +285,31 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) { - final ReplicationSourceShipper worker = new ReplicationSourceShipper(conf, - walGroupId, queue, this); + ReplicationSourceShipper worker = new ReplicationSourceShipper(conf, walGroupId, queue, this); ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); if (extant != null) { LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); } else { LOG.debug("Starting up worker for wal group " + walGroupId); - worker.startup(getUncaughtExceptionHandler()); - worker.setWALReader(startNewWALReader(worker.getName(), walGroupId, queue, - worker.getStartPosition())); + worker.startup(this::uncaughtException); + worker.setWALReader( + startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition())); } } protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId, PriorityBlockingQueue<Path> queue, long startPosition) { ReplicationSourceWALReader walReader = - new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); + new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); return (ReplicationSourceWALReader) Threads.setDaemonThreadRunning(walReader, threadName + ".replicationSource.wal-reader." + walGroupId + "," + queueId, - getUncaughtExceptionHandler()); + this::uncaughtException); } - public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() { - return new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(final Thread t, final Throwable e) { - RSRpcServices.exitIfOOME(e); - LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e); - server.stop("Unexpected exception in " + t.getName()); - } - }; + protected final void uncaughtException(Thread t, Throwable e) { + RSRpcServices.exitIfOOME(e); + LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e); + server.abort("Unexpected exception in " + t.getName(), e); } @Override @@ -434,17 +382,76 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf return replicationPeer.isPeerEnabled(); } + private void initialize() { + int sleepMultiplier = 1; + while (this.isSourceActive()) { + ReplicationEndpoint replicationEndpoint; + try { + replicationEndpoint = createReplicationEndpoint(); + } catch (Exception e) { + LOG.warn("error creating ReplicationEndpoint, retry", e); + if (sleepForRetries("Error creating ReplicationEndpoint", sleepMultiplier)) { + sleepMultiplier++; + } + continue; + } + + try { + initAndStartReplicationEndpoint(replicationEndpoint); + this.replicationEndpoint = replicationEndpoint; + break; + } catch (Exception e) { + LOG.warn("Error starting ReplicationEndpoint, retry", e); + replicationEndpoint.stop(); + if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) { + sleepMultiplier++; + } + } + } + + if (!this.isSourceActive()) { + return; + } + + sleepMultiplier = 1; + // delay this until we are in an asynchronous thread + while (this.isSourceActive() && this.peerClusterId == null) { + this.peerClusterId = replicationEndpoint.getPeerUUID(); + if (this.isSourceActive() && this.peerClusterId == null) { + if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { + sleepMultiplier++; + } + } + } + + // In rare case, zookeeper setting may be messed up. That leads to the incorrect + // peerClusterId value, which is the same as the source clusterId + if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) { + this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId " + + peerClusterId + " which is not allowed by ReplicationEndpoint:" + + replicationEndpoint.getClass().getName(), null, false); + this.manager.removeSource(this); + return; + } + LOG.info("Replicating " + clusterId + " -> " + peerClusterId); + + initializeWALEntryFilter(); + // start workers + for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) { + String walGroupId = entry.getKey(); + PriorityBlockingQueue<Path> queue = entry.getValue(); + tryStartNewShipper(walGroupId, queue); + } + } + @Override public void startup() { - String n = Thread.currentThread().getName(); - Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(final Thread t, final Throwable e) { - LOG.error("Unexpected exception in ReplicationSource", e); - } - }; - Threads.setDaemonThreadRunning(this, n + ".replicationSource," + this.queueId, - handler); + // mark we are running now + this.sourceRunning = true; + initThread = new Thread(this::initialize); + Threads.setDaemonThreadRunning(initThread, + Thread.currentThread().getName() + ".replicationSource," + this.queueId, + this::uncaughtException); } @Override @@ -465,6 +472,13 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf cause); } this.sourceRunning = false; + if (initThread != null && Thread.currentThread() != initThread) { + // This usually won't happen but anyway, let's wait until the initialization thread exits. + // And notice that we may call terminate directly from the initThread so here we need to + // avoid join on ourselves. + initThread.interrupt(); + Threads.shutdown(initThread, this.sleepForRetries); + } Collection<ReplicationSourceShipper> workers = workerThreads.values(); for (ReplicationSourceShipper worker : workers) { worker.stopWorker(); @@ -481,12 +495,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } if (this.replicationEndpoint != null) { try { - this.replicationEndpoint - .awaitTerminated(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS); + this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier, + TimeUnit.MILLISECONDS); } catch (TimeoutException te) { - LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :" - + this.queueId, - te); + LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :" + + this.queueId, te); } } }