Repository: hbase
Updated Branches:
  refs/heads/branch-1 961337aad -> 6e3da5a39


HBASE-18192: Replication drops recovered queues on region server shutdown

Signed-off-by: tedyu <yuzhih...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6e3da5a3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6e3da5a3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6e3da5a3

Branch: refs/heads/branch-1
Commit: 6e3da5a39a21c75de5d0dff9edbe767232a20310
Parents: 961337a
Author: Ashu Pachauri <ashu210...@gmail.com>
Authored: Fri Jun 9 14:36:45 2017 -0700
Committer: tedyu <yuzhih...@gmail.com>
Committed: Fri Jun 9 19:54:17 2017 -0700

----------------------------------------------------------------------
 .../regionserver/ReplicationSource.java         |  47 +++++--
 .../replication/TestReplicationSource.java      | 126 ++++++++++++++++++-
 2 files changed, 162 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6e3da5a3/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 65ea422..6954ea2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -143,6 +143,13 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
   private ConcurrentHashMap<String, ReplicationSourceShipperThread> 
workerThreads =
       new ConcurrentHashMap<String, ReplicationSourceShipperThread>();
 
+  // Hold the state of a replication worker thread
+  public enum WorkerState {
+    RUNNING,
+    STOPPED,
+    FINISHED  // The worker is done processing a recovered queue
+  }
+
   private AtomicLong totalBufferUsed;
 
   /**
@@ -399,7 +406,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     this.sourceRunning = false;
     Collection<ReplicationSourceShipperThread> workers = 
workerThreads.values();
     for (ReplicationSourceShipperThread worker : workers) {
-      worker.setWorkerRunning(false);
+      worker.setWorkerState(WorkerState.STOPPED);
       worker.entryReader.interrupt();
       worker.interrupt();
     }
@@ -513,8 +520,8 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     private long lastLoggedPosition = -1;
     // Path of the current log
     private volatile Path currentPath;
-    // Indicates whether this particular worker is running
-    private boolean workerRunning = true;
+    // Current state of the worker thread
+    private WorkerState state;
     ReplicationSourceWALReaderThread entryReader;
     // Use guava cache to set ttl for each key
     private LoadingCache<String, Boolean> canSkipWaitingSet = 
CacheBuilder.newBuilder()
@@ -538,6 +545,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
 
     @Override
     public void run() {
+      setWorkerState(WorkerState.RUNNING);
       // Loop until we close down
       while (isWorkerActive()) {
         int sleepMultiplier = 1;
@@ -570,7 +578,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
             LOG.debug("Finished recovering queue for group " + walGroupId + " 
of peer "
                 + peerClusterZnode);
             metrics.incrCompletedRecoveryQueue();
-            setWorkerRunning(false);
+            setWorkerState(WorkerState.FINISHED);
             continue;
           }
         } catch (InterruptedException e) {
@@ -579,13 +587,13 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
         }
       }
 
-      if (replicationQueueInfo.isQueueRecovered()) {
+      if (replicationQueueInfo.isQueueRecovered() && getWorkerState() == 
WorkerState.FINISHED) {
         // use synchronize to make sure one last thread will clean the queue
         synchronized (workerThreads) {
           Threads.sleep(100);// wait a short while for other worker thread to 
fully exit
           boolean allOtherTaskDone = true;
           for (ReplicationSourceShipperThread worker : workerThreads.values()) 
{
-            if (!worker.equals(this) && worker.isAlive()) {
+            if (!worker.equals(this) && worker.getWorkerState() != 
WorkerState.FINISHED) {
               allOtherTaskDone = false;
               break;
             }
@@ -597,6 +605,10 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
           }
         }
       }
+      // If the worker exits run loop without finishing it's task, mark it as 
stopped.
+      if (state != WorkerState.FINISHED) {
+        setWorkerState(WorkerState.STOPPED);
+      }
     }
 
     private void waitingUntilCanPush(Map.Entry<String, Long> entry) {
@@ -927,7 +939,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     }
 
     private boolean isWorkerActive() {
-      return !stopper.isStopped() && workerRunning && !isInterrupted();
+      return !stopper.isStopped() && state == WorkerState.RUNNING && 
!isInterrupted();
     }
 
     private void terminate(String reason, Exception cause) {
@@ -940,14 +952,29 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
       }
       entryReader.interrupt();
       Threads.shutdown(entryReader, sleepForRetries);
+      setWorkerState(WorkerState.STOPPED);
       this.interrupt();
       Threads.shutdown(this, sleepForRetries);
       LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
     }
 
-    public void setWorkerRunning(boolean workerRunning) {
-      entryReader.setReaderRunning(workerRunning);
-      this.workerRunning = workerRunning;
+    /**
+     * Set the worker state
+     * @param state
+     */
+    public void setWorkerState(WorkerState state) {
+      this.state = state;
+      if (entryReader != null) {
+        entryReader.setReaderRunning(state == WorkerState.RUNNING);
+      }
+    }
+
+    /**
+     * Get the current state of this worker.
+     * @return WorkerState
+     */
+    public WorkerState getWorkerState() {
+      return state;
     }
 
     private void releaseBufferQuota(int size) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/6e3da5a3/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index f7e644f..6e6fe9a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -18,9 +18,11 @@
  */
 package org.apache.hadoop.hbase.replication;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
+import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -31,6 +33,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.Waiter.Predicate;
@@ -38,6 +42,9 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.wal.WAL;
@@ -49,6 +56,7 @@ import 
org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplica
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -63,10 +71,12 @@ public class TestReplicationSource {
       LogFactory.getLog(TestReplicationSource.class);
   private final static HBaseTestingUtility TEST_UTIL =
       new HBaseTestingUtility();
+  private final static HBaseTestingUtility TEST_UTIL_PEER =
+      new HBaseTestingUtility();
   private static FileSystem FS;
   private static Path oldLogDir;
   private static Path logDir;
-  private static Configuration conf = HBaseConfiguration.create();
+  private static Configuration conf = TEST_UTIL.getConfiguration();
 
   /**
    * @throws java.lang.Exception
@@ -82,6 +92,13 @@ public class TestReplicationSource {
     if (FS.exists(logDir)) FS.delete(logDir, true);
   }
 
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL_PEER.shutdownMiniHBaseCluster();
+    TEST_UTIL.shutdownMiniHBaseCluster();
+    TEST_UTIL.shutdownMiniDFSCluster();
+  }
+
   /**
    * Sanity check that we can move logs around while we are reading
    * from them. Should this test fail, ReplicationSource would have a hard
@@ -172,5 +189,112 @@ public class TestReplicationSource {
 
   }
 
+  /**
+   * Tests that recovered queues are preserved on a regionserver shutdown.
+   * See HBASE-18192
+   * @throws Exception
+   */
+  @Test
+  public void testServerShutdownRecoveredQueue() throws Exception {
+    try {
+      // Ensure single-threaded WAL
+      conf.set("hbase.wal.provider", "defaultProvider");
+      conf.setInt("replication.sleep.before.failover", 2000);
+      // Introduces a delay in regionserver shutdown to give the race 
condition a chance to kick in.
+      conf.set(HConstants.REGION_SERVER_IMPL, 
ShutdownDelayRegionServer.class.getName());
+      MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2);
+      TEST_UTIL_PEER.startMiniCluster(1);
+
+      HRegionServer serverA = cluster.getRegionServer(0);
+      final ReplicationSourceManager managerA =
+          ((Replication) 
serverA.getReplicationSourceService()).getReplicationManager();
+      HRegionServer serverB = cluster.getRegionServer(1);
+      final ReplicationSourceManager managerB =
+          ((Replication) 
serverB.getReplicationSourceService()).getReplicationManager();
+      final ReplicationAdmin replicationAdmin = new 
ReplicationAdmin(TEST_UTIL.getConfiguration());
+
+      final String peerId = "TestPeer";
+      replicationAdmin.addPeer(peerId,
+          new 
ReplicationPeerConfig().setClusterKey(TEST_UTIL_PEER.getClusterKey()), null);
+      // Wait for replication sources to come up
+      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+        @Override public boolean evaluate() throws Exception {
+          return !(managerA.getSources().isEmpty() || 
managerB.getSources().isEmpty());
+        }
+      });
+      // Disabling peer makes sure there is at least one log to claim when the 
server dies
+      // The recovered queue will also stay there until the peer is disabled 
even if the
+      // WALs it contains have no data.
+      replicationAdmin.disablePeer(peerId);
+
+      // Stopping serverA
+      // It's queues should be claimed by the only other alive server i.e. 
serverB
+      cluster.stopRegionServer(serverA.getServerName());
+      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+        @Override public boolean evaluate() throws Exception {
+          return managerB.getOldSources().size() == 1;
+        }
+      });
+
+      final HRegionServer serverC = 
cluster.startRegionServer().getRegionServer();
+      serverC.waitForServerOnline();
+      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+        @Override public boolean evaluate() throws Exception {
+          return serverC.getReplicationSourceService() != null;
+        }
+      });
+      final ReplicationSourceManager managerC =
+          ((Replication) 
serverC.getReplicationSourceService()).getReplicationManager();
+      // Sanity check
+      assertEquals(0, managerC.getOldSources().size());
+
+      // Stopping serverB
+      // Now serverC should have two recovered queues:
+      // 1. The serverB's normal queue
+      // 2. serverA's recovered queue on serverB
+      cluster.stopRegionServer(serverB.getServerName());
+      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+        @Override public boolean evaluate() throws Exception {
+          return managerC.getOldSources().size() == 2;
+        }
+      });
+      replicationAdmin.enablePeer(peerId);
+      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+        @Override public boolean evaluate() throws Exception {
+          return managerC.getOldSources().size() == 0;
+        }
+      });
+    } finally {
+      conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName());
+    }
+  }
+
+  /**
+   * Regionserver implementation that adds a delay on the graceful shutdown.
+   */
+  public static class ShutdownDelayRegionServer extends HRegionServer {
+    public ShutdownDelayRegionServer(Configuration conf) throws IOException, 
InterruptedException {
+      super(conf);
+    }
+
+    public ShutdownDelayRegionServer(Configuration conf, 
CoordinatedStateManager csm)
+        throws IOException, InterruptedException {
+      super(conf, csm);
+    }
+
+    @Override
+    protected void stopServiceThreads() {
+      // Add a delay before service threads are shutdown.
+      // This will keep the zookeeper connection alive for the duration of the 
delay.
+      LOG.info("Adding a delay to the regionserver shutdown");
+      try {
+        Thread.sleep(2000);
+      } catch (InterruptedException ex) {
+        LOG.error("Interrupted while sleeping");
+      }
+      super.stopServiceThreads();
+    }
+  }
+
 }
 

Reply via email to