[02/12] hbase git commit: HBASE-20561 The way we stop a ReplicationSource may cause the RS down

2018-06-14 Thread busbey
HBASE-20561 The way we stop a ReplicationSource may cause the RS down


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ec664343
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ec664343
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ec664343

Branch: refs/heads/HBASE-20331
Commit: ec66434380aee62289ccf7b173d765bbe7083718
Parents: 8648af0
Author: Guanghao Zhang 
Authored: Tue Jun 12 22:19:39 2018 +0800
Committer: Guanghao Zhang 
Committed: Wed Jun 13 17:58:59 2018 +0800

--
 .../regionserver/ReplicationSource.java | 24 +++--
 .../regionserver/ReplicationSourceManager.java  | 28 +---
 .../hadoop/hbase/zookeeper/ZKWatcher.java   |  4 ++-
 3 files changed, 49 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hbase/blob/ec664343/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
--
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index d21d83c..b63712b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -499,9 +499,29 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
 Collection workers = workerThreads.values();
 for (ReplicationSourceShipper worker : workers) {
   worker.stopWorker();
-  worker.entryReader.interrupt();
-  worker.interrupt();
+  worker.entryReader.setReaderRunning(false);
 }
+
+for (ReplicationSourceShipper worker : workers) {
+  if (worker.isAlive() || worker.entryReader.isAlive()) {
+try {
+  // Wait worker to stop
+  Thread.sleep(this.sleepForRetries);
+} catch (InterruptedException e) {
+  LOG.info("Interrupted while waiting " + worker.getName() + " to 
stop");
+  Thread.currentThread().interrupt();
+}
+// If worker still is alive after waiting, interrupt it
+if (worker.isAlive()) {
+  worker.interrupt();
+}
+// If entry reader is alive after waiting, interrupt it
+if (worker.entryReader.isAlive()) {
+  worker.entryReader.interrupt();
+}
+  }
+}
+
 if (this.replicationEndpoint != null) {
   this.replicationEndpoint.stop();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec664343/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
--
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 9b4a22c..a370867 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -449,6 +450,24 @@ public class ReplicationSourceManager implements 
ReplicationListener {
 void exec() throws ReplicationException;
   }
 
+  /**
+   * Refresh replication source will terminate the old source first, then the 
source thread will be
+   * interrupted. Need to handle it instead of abort the region server.
+   */
+  private void interruptOrAbortWhenFail(ReplicationQueueOperation op) {
+try {
+  op.exec();
+} catch (ReplicationException e) {
+  if (e.getCause() != null && e.getCause() instanceof 
KeeperException.SystemErrorException
+  && e.getCause().getCause() != null && e.getCause()
+  .getCause() instanceof InterruptedException) {
+throw new RuntimeException(
+"Thread is interrupted, the replication source may be terminated");
+  }
+  server.abort("Failed to operate on replication queue", e);
+}
+  }
+
   private void abortWhenFail(ReplicationQueueOperation op) {
 try {
   op.exec();
@@ -484,8 +503,9 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   public void 

hbase git commit: HBASE-20561 The way we stop a ReplicationSource may cause the RS down

2018-06-13 Thread zghao
Repository: hbase
Updated Branches:
  refs/heads/branch-2 0a6aec498 -> 075523dd1


HBASE-20561 The way we stop a ReplicationSource may cause the RS down


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/075523dd
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/075523dd
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/075523dd

Branch: refs/heads/branch-2
Commit: 075523dd1eabb027f3268f51a4aa0f17113e58d3
Parents: 0a6aec4
Author: Guanghao Zhang 
Authored: Tue Jun 12 22:19:39 2018 +0800
Committer: Guanghao Zhang 
Committed: Wed Jun 13 18:05:27 2018 +0800

--
 .../regionserver/ReplicationSource.java | 24 +++--
 .../regionserver/ReplicationSourceManager.java  | 28 +---
 .../hadoop/hbase/zookeeper/ZKWatcher.java   |  4 ++-
 3 files changed, 49 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hbase/blob/075523dd/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
--
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index d21d83c..b63712b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -499,9 +499,29 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
 Collection workers = workerThreads.values();
 for (ReplicationSourceShipper worker : workers) {
   worker.stopWorker();
-  worker.entryReader.interrupt();
-  worker.interrupt();
+  worker.entryReader.setReaderRunning(false);
 }
+
+for (ReplicationSourceShipper worker : workers) {
+  if (worker.isAlive() || worker.entryReader.isAlive()) {
+try {
+  // Wait worker to stop
+  Thread.sleep(this.sleepForRetries);
+} catch (InterruptedException e) {
+  LOG.info("Interrupted while waiting " + worker.getName() + " to 
stop");
+  Thread.currentThread().interrupt();
+}
+// If worker still is alive after waiting, interrupt it
+if (worker.isAlive()) {
+  worker.interrupt();
+}
+// If entry reader is alive after waiting, interrupt it
+if (worker.entryReader.isAlive()) {
+  worker.entryReader.interrupt();
+}
+  }
+}
+
 if (this.replicationEndpoint != null) {
   this.replicationEndpoint.stop();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/075523dd/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
--
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 9b4a22c..a370867 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -449,6 +450,24 @@ public class ReplicationSourceManager implements 
ReplicationListener {
 void exec() throws ReplicationException;
   }
 
+  /**
+   * Refresh replication source will terminate the old source first, then the 
source thread will be
+   * interrupted. Need to handle it instead of abort the region server.
+   */
+  private void interruptOrAbortWhenFail(ReplicationQueueOperation op) {
+try {
+  op.exec();
+} catch (ReplicationException e) {
+  if (e.getCause() != null && e.getCause() instanceof 
KeeperException.SystemErrorException
+  && e.getCause().getCause() != null && e.getCause()
+  .getCause() instanceof InterruptedException) {
+throw new RuntimeException(
+"Thread is interrupted, the replication source may be terminated");
+  }
+  server.abort("Failed to operate on replication queue", e);
+}
+  }
+
   private void abortWhenFail(ReplicationQueueOperation op) {
 try {
   op.exec();
@@ -484,8 +503,9 @@ public class 

hbase git commit: HBASE-20561 The way we stop a ReplicationSource may cause the RS down

2018-06-13 Thread zghao
Repository: hbase
Updated Branches:
  refs/heads/master 8648af07d -> ec6643438


HBASE-20561 The way we stop a ReplicationSource may cause the RS down


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ec664343
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ec664343
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ec664343

Branch: refs/heads/master
Commit: ec66434380aee62289ccf7b173d765bbe7083718
Parents: 8648af0
Author: Guanghao Zhang 
Authored: Tue Jun 12 22:19:39 2018 +0800
Committer: Guanghao Zhang 
Committed: Wed Jun 13 17:58:59 2018 +0800

--
 .../regionserver/ReplicationSource.java | 24 +++--
 .../regionserver/ReplicationSourceManager.java  | 28 +---
 .../hadoop/hbase/zookeeper/ZKWatcher.java   |  4 ++-
 3 files changed, 49 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hbase/blob/ec664343/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
--
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index d21d83c..b63712b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -499,9 +499,29 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
 Collection workers = workerThreads.values();
 for (ReplicationSourceShipper worker : workers) {
   worker.stopWorker();
-  worker.entryReader.interrupt();
-  worker.interrupt();
+  worker.entryReader.setReaderRunning(false);
 }
+
+for (ReplicationSourceShipper worker : workers) {
+  if (worker.isAlive() || worker.entryReader.isAlive()) {
+try {
+  // Wait worker to stop
+  Thread.sleep(this.sleepForRetries);
+} catch (InterruptedException e) {
+  LOG.info("Interrupted while waiting " + worker.getName() + " to 
stop");
+  Thread.currentThread().interrupt();
+}
+// If worker still is alive after waiting, interrupt it
+if (worker.isAlive()) {
+  worker.interrupt();
+}
+// If entry reader is alive after waiting, interrupt it
+if (worker.entryReader.isAlive()) {
+  worker.entryReader.interrupt();
+}
+  }
+}
+
 if (this.replicationEndpoint != null) {
   this.replicationEndpoint.stop();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec664343/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
--
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 9b4a22c..a370867 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -449,6 +450,24 @@ public class ReplicationSourceManager implements 
ReplicationListener {
 void exec() throws ReplicationException;
   }
 
+  /**
+   * Refresh replication source will terminate the old source first, then the 
source thread will be
+   * interrupted. Need to handle it instead of abort the region server.
+   */
+  private void interruptOrAbortWhenFail(ReplicationQueueOperation op) {
+try {
+  op.exec();
+} catch (ReplicationException e) {
+  if (e.getCause() != null && e.getCause() instanceof 
KeeperException.SystemErrorException
+  && e.getCause().getCause() != null && e.getCause()
+  .getCause() instanceof InterruptedException) {
+throw new RuntimeException(
+"Thread is interrupted, the replication source may be terminated");
+  }
+  server.abort("Failed to operate on replication queue", e);
+}
+  }
+
   private void abortWhenFail(ReplicationQueueOperation op) {
 try {
   op.exec();
@@ -484,8 +503,9 @@ public class