Repository: hbase
Updated Branches:
  refs/heads/branch-2 0a6aec498 -> 075523dd1


HBASE-20561 The way we stop a ReplicationSource may cause the RS down


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/075523dd
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/075523dd
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/075523dd

Branch: refs/heads/branch-2
Commit: 075523dd1eabb027f3268f51a4aa0f17113e58d3
Parents: 0a6aec4
Author: Guanghao Zhang <zg...@apache.org>
Authored: Tue Jun 12 22:19:39 2018 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Wed Jun 13 18:05:27 2018 +0800

----------------------------------------------------------------------
 .../regionserver/ReplicationSource.java         | 24 +++++++++++++++--
 .../regionserver/ReplicationSourceManager.java  | 28 +++++++++++++++++---
 .../hadoop/hbase/zookeeper/ZKWatcher.java       |  4 ++-
 3 files changed, 49 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/075523dd/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index d21d83c..b63712b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -499,9 +499,29 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     Collection<ReplicationSourceShipper> workers = workerThreads.values();
     for (ReplicationSourceShipper worker : workers) {
       worker.stopWorker();
-      worker.entryReader.interrupt();
-      worker.interrupt();
+      worker.entryReader.setReaderRunning(false);
     }
+
+    for (ReplicationSourceShipper worker : workers) {
+      if (worker.isAlive() || worker.entryReader.isAlive()) {
+        try {
+          // Wait worker to stop
+          Thread.sleep(this.sleepForRetries);
+        } catch (InterruptedException e) {
+          LOG.info("Interrupted while waiting " + worker.getName() + " to 
stop");
+          Thread.currentThread().interrupt();
+        }
+        // If worker still is alive after waiting, interrupt it
+        if (worker.isAlive()) {
+          worker.interrupt();
+        }
+        // If entry reader is alive after waiting, interrupt it
+        if (worker.entryReader.isAlive()) {
+          worker.entryReader.interrupt();
+        }
+      }
+    }
+
     if (this.replicationEndpoint != null) {
       this.replicationEndpoint.stop();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/075523dd/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 9b4a22c..a370867 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -449,6 +450,24 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     void exec() throws ReplicationException;
   }
 
+  /**
+   * Refresh replication source will terminate the old source first, then the 
source thread will be
+   * interrupted. Need to handle it instead of abort the region server.
+   */
+  private void interruptOrAbortWhenFail(ReplicationQueueOperation op) {
+    try {
+      op.exec();
+    } catch (ReplicationException e) {
+      if (e.getCause() != null && e.getCause() instanceof 
KeeperException.SystemErrorException
+          && e.getCause().getCause() != null && e.getCause()
+          .getCause() instanceof InterruptedException) {
+        throw new RuntimeException(
+            "Thread is interrupted, the replication source may be terminated");
+      }
+      server.abort("Failed to operate on replication queue", e);
+    }
+  }
+
   private void abortWhenFail(ReplicationQueueOperation op) {
     try {
       op.exec();
@@ -484,8 +503,9 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   public void logPositionAndCleanOldLogs(String queueId, boolean 
queueRecovered,
       WALEntryBatch entryBatch) {
     String fileName = entryBatch.getLastWalPath().getName();
-    abortWhenFail(() -> 
this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName,
-      entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds()));
+    interruptOrAbortWhenFail(() -> this.queueStorage
+        .setWALPosition(server.getServerName(), queueId, fileName, 
entryBatch.getLastWalPosition(),
+            entryBatch.getLastSeqIds()));
     cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, queueRecovered);
   }
 
@@ -523,7 +543,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     }
     LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet);
     for (String wal : walSet) {
-      abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), 
id, wal));
+      interruptOrAbortWhenFail(() -> 
this.queueStorage.removeWAL(server.getServerName(), id, wal));
     }
     walSet.clear();
   }
@@ -886,7 +906,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   }
 
   public void cleanUpHFileRefs(String peerId, List<String> files) {
-    abortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files));
+    interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, 
files));
   }
 
   int activeFailoverTaskCount() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/075523dd/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java
----------------------------------------------------------------------
diff --git 
a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java
 
b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java
index cceedfd..ce00af4 100644
--- 
a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java
+++ 
b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java
@@ -607,7 +607,9 @@ public class ZKWatcher implements Watcher, Abortable, 
Closeable {
   public void interruptedException(InterruptedException ie) throws 
KeeperException {
     interruptedExceptionNoThrow(ie, true);
     // Throw a system error exception to let upper level handle it
-    throw new KeeperException.SystemErrorException();
+    KeeperException keeperException = new 
KeeperException.SystemErrorException();
+    keeperException.initCause(ie);
+    throw keeperException;
   }
 
   /**

Reply via email to