[02/12] hbase git commit: HBASE-20561 The way we stop a ReplicationSource may cause the RS down
HBASE-20561 The way we stop a ReplicationSource may cause the RS down Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ec664343 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ec664343 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ec664343 Branch: refs/heads/HBASE-20331 Commit: ec66434380aee62289ccf7b173d765bbe7083718 Parents: 8648af0 Author: Guanghao Zhang Authored: Tue Jun 12 22:19:39 2018 +0800 Committer: Guanghao Zhang Committed: Wed Jun 13 17:58:59 2018 +0800 -- .../regionserver/ReplicationSource.java | 24 +++-- .../regionserver/ReplicationSourceManager.java | 28 +--- .../hadoop/hbase/zookeeper/ZKWatcher.java | 4 ++- 3 files changed, 49 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hbase/blob/ec664343/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java -- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index d21d83c..b63712b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -499,9 +499,29 @@ public class ReplicationSource implements ReplicationSourceInterface { Collection workers = workerThreads.values(); for (ReplicationSourceShipper worker : workers) { worker.stopWorker(); - worker.entryReader.interrupt(); - worker.interrupt(); + worker.entryReader.setReaderRunning(false); } + +for (ReplicationSourceShipper worker : workers) { + if (worker.isAlive() || worker.entryReader.isAlive()) { +try { + // Wait worker to stop + Thread.sleep(this.sleepForRetries); +} catch (InterruptedException e) { + LOG.info("Interrupted while waiting " + worker.getName() + " to stop"); + Thread.currentThread().interrupt(); +} +// If worker still is alive after waiting, interrupt it +if (worker.isAlive()) { + worker.interrupt(); +} +// If entry reader is alive after waiting, interrupt it +if (worker.entryReader.isAlive()) { + worker.entryReader.interrupt(); +} + } +} + if (this.replicationEndpoint != null) { this.replicationEndpoint.stop(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ec664343/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java -- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 9b4a22c..a370867 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -449,6 +450,24 @@ public class ReplicationSourceManager implements ReplicationListener { void exec() throws ReplicationException; } + /** + * Refresh replication source will terminate the old source first, then the source thread will be + * interrupted. Need to handle it instead of abort the region server. + */ + private void interruptOrAbortWhenFail(ReplicationQueueOperation op) { +try { + op.exec(); +} catch (ReplicationException e) { + if (e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException + && e.getCause().getCause() != null && e.getCause() + .getCause() instanceof InterruptedException) { +throw new RuntimeException( +"Thread is interrupted, the replication source may be terminated"); + } + server.abort("Failed to operate on replication queue", e); +} + } + private void abortWhenFail(ReplicationQueueOperation op) { try { op.exec(); @@ -484,8 +503,9 @@ public class ReplicationSourceManager implements ReplicationListener { public void
hbase git commit: HBASE-20561 The way we stop a ReplicationSource may cause the RS down
Repository: hbase Updated Branches: refs/heads/branch-2 0a6aec498 -> 075523dd1 HBASE-20561 The way we stop a ReplicationSource may cause the RS down Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/075523dd Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/075523dd Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/075523dd Branch: refs/heads/branch-2 Commit: 075523dd1eabb027f3268f51a4aa0f17113e58d3 Parents: 0a6aec4 Author: Guanghao Zhang Authored: Tue Jun 12 22:19:39 2018 +0800 Committer: Guanghao Zhang Committed: Wed Jun 13 18:05:27 2018 +0800 -- .../regionserver/ReplicationSource.java | 24 +++-- .../regionserver/ReplicationSourceManager.java | 28 +--- .../hadoop/hbase/zookeeper/ZKWatcher.java | 4 ++- 3 files changed, 49 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hbase/blob/075523dd/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java -- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index d21d83c..b63712b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -499,9 +499,29 @@ public class ReplicationSource implements ReplicationSourceInterface { Collection workers = workerThreads.values(); for (ReplicationSourceShipper worker : workers) { worker.stopWorker(); - worker.entryReader.interrupt(); - worker.interrupt(); + worker.entryReader.setReaderRunning(false); } + +for (ReplicationSourceShipper worker : workers) { + if (worker.isAlive() || worker.entryReader.isAlive()) { +try { + // Wait worker to stop + Thread.sleep(this.sleepForRetries); +} catch (InterruptedException e) { + LOG.info("Interrupted while waiting " + worker.getName() + " to stop"); + Thread.currentThread().interrupt(); +} +// If worker still is alive after waiting, interrupt it +if (worker.isAlive()) { + worker.interrupt(); +} +// If entry reader is alive after waiting, interrupt it +if (worker.entryReader.isAlive()) { + worker.entryReader.interrupt(); +} + } +} + if (this.replicationEndpoint != null) { this.replicationEndpoint.stop(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/075523dd/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java -- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 9b4a22c..a370867 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -449,6 +450,24 @@ public class ReplicationSourceManager implements ReplicationListener { void exec() throws ReplicationException; } + /** + * Refresh replication source will terminate the old source first, then the source thread will be + * interrupted. Need to handle it instead of abort the region server. + */ + private void interruptOrAbortWhenFail(ReplicationQueueOperation op) { +try { + op.exec(); +} catch (ReplicationException e) { + if (e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException + && e.getCause().getCause() != null && e.getCause() + .getCause() instanceof InterruptedException) { +throw new RuntimeException( +"Thread is interrupted, the replication source may be terminated"); + } + server.abort("Failed to operate on replication queue", e); +} + } + private void abortWhenFail(ReplicationQueueOperation op) { try { op.exec(); @@ -484,8 +503,9 @@ public class
hbase git commit: HBASE-20561 The way we stop a ReplicationSource may cause the RS down
Repository: hbase Updated Branches: refs/heads/master 8648af07d -> ec6643438 HBASE-20561 The way we stop a ReplicationSource may cause the RS down Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ec664343 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ec664343 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ec664343 Branch: refs/heads/master Commit: ec66434380aee62289ccf7b173d765bbe7083718 Parents: 8648af0 Author: Guanghao Zhang Authored: Tue Jun 12 22:19:39 2018 +0800 Committer: Guanghao Zhang Committed: Wed Jun 13 17:58:59 2018 +0800 -- .../regionserver/ReplicationSource.java | 24 +++-- .../regionserver/ReplicationSourceManager.java | 28 +--- .../hadoop/hbase/zookeeper/ZKWatcher.java | 4 ++- 3 files changed, 49 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hbase/blob/ec664343/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java -- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index d21d83c..b63712b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -499,9 +499,29 @@ public class ReplicationSource implements ReplicationSourceInterface { Collection workers = workerThreads.values(); for (ReplicationSourceShipper worker : workers) { worker.stopWorker(); - worker.entryReader.interrupt(); - worker.interrupt(); + worker.entryReader.setReaderRunning(false); } + +for (ReplicationSourceShipper worker : workers) { + if (worker.isAlive() || worker.entryReader.isAlive()) { +try { + // Wait worker to stop + Thread.sleep(this.sleepForRetries); +} catch (InterruptedException e) { + LOG.info("Interrupted while waiting " + worker.getName() + " to stop"); + Thread.currentThread().interrupt(); +} +// If worker still is alive after waiting, interrupt it +if (worker.isAlive()) { + worker.interrupt(); +} +// If entry reader is alive after waiting, interrupt it +if (worker.entryReader.isAlive()) { + worker.entryReader.interrupt(); +} + } +} + if (this.replicationEndpoint != null) { this.replicationEndpoint.stop(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ec664343/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java -- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 9b4a22c..a370867 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -449,6 +450,24 @@ public class ReplicationSourceManager implements ReplicationListener { void exec() throws ReplicationException; } + /** + * Refresh replication source will terminate the old source first, then the source thread will be + * interrupted. Need to handle it instead of abort the region server. + */ + private void interruptOrAbortWhenFail(ReplicationQueueOperation op) { +try { + op.exec(); +} catch (ReplicationException e) { + if (e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException + && e.getCause().getCause() != null && e.getCause() + .getCause() instanceof InterruptedException) { +throw new RuntimeException( +"Thread is interrupted, the replication source may be terminated"); + } + server.abort("Failed to operate on replication queue", e); +} + } + private void abortWhenFail(ReplicationQueueOperation op) { try { op.exec(); @@ -484,8 +503,9 @@ public class