Repository: hbase
Updated Branches:
  refs/heads/branch-1.3 00c91b01e -> 0fda2bc9e


HBASE-16081 Wait for Replication Tasks to complete before killing the 
ThreadPoolExecutor inside of HBaseInterClusterReplicationEndpoint

Signed-off-by: Mikhail Antonov <anto...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0fda2bc9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0fda2bc9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0fda2bc9

Branch: refs/heads/branch-1.3
Commit: 0fda2bc9e7cbd58d4e67d0e9dcc420bc7ea98eab
Parents: 00c91b0
Author: Joseph Hwang <j...@fb.com>
Authored: Mon Jul 11 13:17:56 2016 -0700
Committer: Mikhail Antonov <anto...@apache.org>
Committed: Mon Jul 11 18:10:54 2016 -0700

----------------------------------------------------------------------
 .../hbase/replication/ReplicationEndpoint.java  |  7 +++-
 .../HBaseInterClusterReplicationEndpoint.java   | 34 +++++++++++++++++---
 .../regionserver/ReplicationSourceManager.java  |  3 +-
 3 files changed, 36 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0fda2bc9/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
index ac1257f..a88e454 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
 
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -58,6 +59,7 @@ public interface ReplicationEndpoint extends Service {
     private final String peerId;
     private final UUID clusterId;
     private final MetricsSource metrics;
+    private final Abortable abortable;
 
     @InterfaceAudience.Private
     public Context(
@@ -68,7 +70,8 @@ public interface ReplicationEndpoint extends Service {
         final UUID clusterId,
         final ReplicationPeer replicationPeer,
         final MetricsSource metrics,
-        final TableDescriptors tableDescriptors) {
+        final TableDescriptors tableDescriptors,
+        final Abortable abortable) {
       this.peerConfig = peerConfig;
       this.conf = conf;
       this.fs = fs;
@@ -77,6 +80,7 @@ public interface ReplicationEndpoint extends Service {
       this.replicationPeer = replicationPeer;
       this.metrics = metrics;
       this.tableDescriptors = tableDescriptors;
+      this.abortable = abortable;
     }
     public Configuration getConfiguration() {
       return conf;
@@ -102,6 +106,7 @@ public interface ReplicationEndpoint extends Service {
     public TableDescriptors getTableDescriptors() {
       return tableDescriptors;
     }
+    public Abortable getAbortable() { return abortable; }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/0fda2bc9/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index b94d21d..548f716 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -38,6 +38,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableNotFoundException;
@@ -71,17 +72,19 @@ import com.google.common.annotations.VisibleForTesting;
 public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoint {
 
   private static final Log LOG = 
LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class);
-  private HConnection conn;
 
-  private Configuration conf;
+  private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
 
+  private HConnection conn;
+  private Configuration conf;
   // How long should we sleep for each retry
   private long sleepForRetries;
-
   // Maximum number of retries before taking bold actions
   private int maxRetriesMultiplier;
   // Socket timeouts require even bolder actions since we don't want to DDOS
   private int socketTimeoutMultiplier;
+  // Amount of time for shutdown to wait for all tasks to complete
+  private long maxTerminationWait;
   //Metrics for this source
   private MetricsSource metrics;
   // Handles connecting to peer region servers
@@ -93,6 +96,7 @@ public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoi
   private Path baseNamespaceDir;
   private Path hfileArchiveDir;
   private boolean replicationBulkLoadDataEnabled;
+  private Abortable abortable;
 
   @Override
   public void init(Context context) throws IOException {
@@ -102,6 +106,13 @@ public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoi
     this.maxRetriesMultiplier = 
this.conf.getInt("replication.source.maxretriesmultiplier", 300);
     this.socketTimeoutMultiplier = 
this.conf.getInt("replication.source.socketTimeoutMultiplier",
         maxRetriesMultiplier);
+    // A Replicator job is bound by the RPC timeout. We will wait this long 
for all Replicator
+    // tasks to terminate when doStop() is called.
+    long maxTerminationWaitMultiplier = this.conf.getLong(
+        "replication.source.maxterminationmultiplier",
+        DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER);
+    this.maxTerminationWait = maxTerminationWaitMultiplier *
+        this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
     // TODO: This connection is replication specific or we should make it 
particular to
     // replication and make replication specific settings such as compression 
or codec to use
     // passing Cells.
@@ -117,6 +128,7 @@ public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoi
     this.exec = new ThreadPoolExecutor(maxThreads, maxThreads, 60, 
TimeUnit.SECONDS,
         new LinkedBlockingQueue<Runnable>());
     this.exec.allowCoreThreadTimeOut(true);
+    this.abortable = ctx.getAbortable();
 
     this.replicationBulkLoadDataEnabled =
         conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
@@ -211,7 +223,7 @@ public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoi
         
entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e);
       }
     }
-    while (this.isRunning()) {
+    while (this.isRunning() && !exec.isShutdown()) {
       if (!isPeerEnabled()) {
         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
           sleepMultiplier++;
@@ -321,7 +333,19 @@ public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoi
         LOG.warn("Failed to close the connection");
       }
     }
-    exec.shutdownNow();
+    // Allow currently running replication tasks to finish
+    exec.shutdown();
+    try {
+      exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+    }
+    // Abort if the tasks did not terminate in time
+    if (!exec.isTerminated()) {
+      String errMsg = "HBaseInterClusterReplicationEndpoint termination 
failed. The " +
+          "ThreadPoolExecutor failed to finish all tasks within " + 
maxTerminationWait + "ms. " +
+          "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
+      abortable.abort(errMsg, new IOException(errMsg));
+    }
     notifyStopped();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/0fda2bc9/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 6135328..dcc3634 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -501,8 +501,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
 
     // init replication endpoint
     replicationEndpoint.init(new 
ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
-      fs, peerConfig, peerId, clusterId, replicationPeer, metrics, 
tableDescriptors));
-
+      fs, peerConfig, peerId, clusterId, replicationPeer, metrics, 
tableDescriptors, server));
     return src;
   }
 

Reply via email to