Repository: hbase Updated Branches: refs/heads/branch-1.2 a2617b00c -> 96e48c3df
HBASE-18192: Replication drops recovered queues on region server shutdown (Ashu Pachauri) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/96e48c3d Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/96e48c3d Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/96e48c3d Branch: refs/heads/branch-1.2 Commit: 96e48c3df597fc1450546818e2bd34cfc1fd5c10 Parents: a2617b0 Author: tedyu <yuzhih...@gmail.com> Authored: Fri Jun 9 20:03:20 2017 -0700 Committer: tedyu <yuzhih...@gmail.com> Committed: Fri Jun 9 20:03:20 2017 -0700 ---------------------------------------------------------------------- .../hbase/regionserver/HRegionServer.java | 3 +- .../regionserver/ReplicationSource.java | 42 ++++-- .../replication/TestReplicationSource.java | 128 ++++++++++++++++++- 3 files changed, 161 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/96e48c3d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 129b5a7..4ae0286 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2213,7 +2213,8 @@ public class HRegionServer extends HasThread implements * @return Return the object that implements the replication * source service. */ - ReplicationSourceService getReplicationSourceService() { + @VisibleForTesting + public ReplicationSourceService getReplicationSourceService() { return replicationSourceHandler; } http://git-wip-us.apache.org/repos/asf/hbase/blob/96e48c3d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 2d5dedd..2285a5e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -138,6 +138,12 @@ public class ReplicationSource extends Thread private AtomicInteger logQueueSize = new AtomicInteger(0); private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads = new ConcurrentHashMap<String, ReplicationSourceWorkerThread>(); + // Hold the state of a replication worker thread + public enum WorkerState { + RUNNING, + STOPPED, + FINISHED // The worker is done processing a recovered queue + } /** * Instantiation method used by region servers @@ -362,7 +368,7 @@ public class ReplicationSource extends Thread this.sourceRunning = false; Collection<ReplicationSourceWorkerThread> workers = workerThreads.values(); for (ReplicationSourceWorkerThread worker : workers) { - worker.setWorkerRunning(false); + worker.setWorkerState(WorkerState.STOPPED); worker.interrupt(); } ListenableFuture<Service.State> future = null; @@ -477,8 +483,8 @@ public class ReplicationSource extends Thread private int currentNbOperations = 0; // Current size of data we need to replicate private int currentSize = 0; - // Indicates whether this particular worker is running - private boolean workerRunning = true; + // Current state of the worker thread + private WorkerState state; public ReplicationSourceWorkerThread(String walGroupId, PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) { @@ -491,6 +497,7 @@ public class ReplicationSource extends Thread @Override public void run() { + setWorkerState(WorkerState.RUNNING); // If this is recovered, the queue is already full and the first log // normally has a position (unless the RS failed between 2 logs) if (this.replicationQueueInfo.isQueueRecovered()) { @@ -623,13 +630,13 @@ public class ReplicationSource extends Thread sleepMultiplier = 1; shipEdits(currentWALisBeingWrittenTo, entries); } - if (replicationQueueInfo.isQueueRecovered()) { + if (replicationQueueInfo.isQueueRecovered() && getWorkerState() == WorkerState.FINISHED) { // use synchronize to make sure one last thread will clean the queue synchronized (workerThreads) { Threads.sleep(100);// wait a short while for other worker thread to fully exit boolean allOtherTaskDone = true; for (ReplicationSourceWorkerThread worker : workerThreads.values()) { - if (!worker.equals(this) && worker.isAlive()) { + if (!worker.equals(this) && worker.getWorkerState() != WorkerState.FINISHED) { allOtherTaskDone = false; break; } @@ -641,6 +648,10 @@ public class ReplicationSource extends Thread } } } + // If the worker exits run loop without finishing it's task, mark it as stopped. + if (state != WorkerState.FINISHED) { + setWorkerState(WorkerState.STOPPED); + } } /** @@ -1023,7 +1034,7 @@ public class ReplicationSource extends Thread LOG.debug("Finished recovering queue for group " + walGroupId + " of peer " + peerClusterZnode); metrics.incrCompletedRecoveryQueue(); - workerRunning = false; + setWorkerState(WorkerState.FINISHED); return true; } return false; @@ -1054,7 +1065,7 @@ public class ReplicationSource extends Thread } private boolean isWorkerActive() { - return !stopper.isStopped() && workerRunning && !isInterrupted(); + return !stopper.isStopped() && state == WorkerState.RUNNING && !isInterrupted(); } private void terminate(String reason, Exception cause) { @@ -1065,13 +1076,26 @@ public class ReplicationSource extends Thread LOG.error("Closing worker for wal group " + this.walGroupId + " because an error occurred: " + reason, cause); } + setWorkerState(WorkerState.STOPPED); this.interrupt(); Threads.shutdown(this, sleepForRetries); LOG.info("ReplicationSourceWorker " + this.getName() + " terminated"); } - public void setWorkerRunning(boolean workerRunning) { - this.workerRunning = workerRunning; + /** + * Set the worker state + * @param state + */ + public void setWorkerState(WorkerState state) { + this.state = state; + } + + /** + * Get the current state of this worker. + * @return WorkerState + */ + public WorkerState getWorkerState() { + return state; } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/96e48c3d/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java index 9bf0e93..8c597fa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java @@ -18,9 +18,11 @@ */ package org.apache.hadoop.hbase.replication; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -30,6 +32,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter.Predicate; @@ -37,6 +41,10 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.replication.regionserver.Replication; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALProvider; @@ -46,7 +54,8 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; + +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -60,10 +69,12 @@ public class TestReplicationSource { LogFactory.getLog(TestReplicationSource.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static HBaseTestingUtility TEST_UTIL_PEER = + new HBaseTestingUtility(); private static FileSystem FS; private static Path oldLogDir; private static Path logDir; - private static Configuration conf = HBaseConfiguration.create(); + private static Configuration conf = TEST_UTIL.getConfiguration(); /** * @throws java.lang.Exception @@ -79,6 +90,13 @@ public class TestReplicationSource { if (FS.exists(logDir)) FS.delete(logDir, true); } + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL_PEER.shutdownMiniHBaseCluster(); + TEST_UTIL.shutdownMiniHBaseCluster(); + TEST_UTIL.shutdownMiniDFSCluster(); + } + /** * Sanity check that we can move logs around while we are reading * from them. Should this test fail, ReplicationSource would have a hard @@ -165,5 +183,111 @@ public class TestReplicationSource { } + /** + * Tests that recovered queues are preserved on a regionserver shutdown. + * See HBASE-18192 + * @throws Exception + */ + @Test + public void testServerShutdownRecoveredQueue() throws Exception { + try { + // Ensure single-threaded WAL + conf.set("hbase.wal.provider", "defaultProvider"); + conf.setInt("replication.sleep.before.failover", 2000); + // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in. + conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName()); + MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2); + TEST_UTIL_PEER.startMiniCluster(1); + + HRegionServer serverA = cluster.getRegionServer(0); + final ReplicationSourceManager managerA = + ((Replication) serverA.getReplicationSourceService()).getReplicationManager(); + HRegionServer serverB = cluster.getRegionServer(1); + final ReplicationSourceManager managerB = + ((Replication) serverB.getReplicationSourceService()).getReplicationManager(); + final ReplicationAdmin replicationAdmin = new ReplicationAdmin(TEST_UTIL.getConfiguration()); + + final String peerId = "TestPeer"; + replicationAdmin.addPeer(peerId, + new ReplicationPeerConfig().setClusterKey(TEST_UTIL_PEER.getClusterKey()), null); + // Wait for replication sources to come up + Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { + @Override public boolean evaluate() throws Exception { + return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty()); + } + }); + // Disabling peer makes sure there is at least one log to claim when the server dies + // The recovered queue will also stay there until the peer is disabled even if the + // WALs it contains have no data. + replicationAdmin.disablePeer(peerId); + + // Stopping serverA + // It's queues should be claimed by the only other alive server i.e. serverB + cluster.stopRegionServer(serverA.getServerName()); + Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { + @Override public boolean evaluate() throws Exception { + return managerB.getOldSources().size() == 1; + } + }); + + final HRegionServer serverC = cluster.startRegionServer().getRegionServer(); + serverC.waitForServerOnline(); + Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { + @Override public boolean evaluate() throws Exception { + return serverC.getReplicationSourceService() != null; + } + }); + final ReplicationSourceManager managerC = + ((Replication) serverC.getReplicationSourceService()).getReplicationManager(); + // Sanity check + assertEquals(0, managerC.getOldSources().size()); + + // Stopping serverB + // Now serverC should have two recovered queues: + // 1. The serverB's normal queue + // 2. serverA's recovered queue on serverB + cluster.stopRegionServer(serverB.getServerName()); + Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { + @Override public boolean evaluate() throws Exception { + return managerC.getOldSources().size() == 2; + } + }); + replicationAdmin.enablePeer(peerId); + Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { + @Override public boolean evaluate() throws Exception { + return managerC.getOldSources().size() == 0; + } + }); + } finally { + conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName()); + } + } + + /** + * Regionserver implementation that adds a delay on the graceful shutdown. + */ + public static class ShutdownDelayRegionServer extends HRegionServer { + public ShutdownDelayRegionServer(Configuration conf) throws IOException, InterruptedException { + super(conf); + } + + public ShutdownDelayRegionServer(Configuration conf, CoordinatedStateManager csm) + throws IOException, InterruptedException { + super(conf, csm); + } + + @Override + protected void stopServiceThreads() { + // Add a delay before service threads are shutdown. + // This will keep the zookeeper connection alive for the duration of the delay. + LOG.info("Adding a delay to the regionserver shutdown"); + try { + Thread.sleep(2000); + } catch (InterruptedException ex) { + LOG.error("Interrupted while sleeping"); + } + super.stopServiceThreads(); + } + } }