Repository: hbase
Updated Branches:
  refs/heads/branch-2 3757915da -> a9d9fa35a


HBASE-17165 Make use of retry setting in LoadIncrementalHFiles & fix test


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a9d9fa35
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a9d9fa35
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a9d9fa35

Branch: refs/heads/branch-2
Commit: a9d9fa35a2d99e91b16b05afb40d5be5dd0c98ec
Parents: 3757915
Author: Mike Grimes <grime...@amazon.com>
Authored: Fri Nov 17 19:47:54 2017 -0800
Committer: Michael Stack <st...@apache.org>
Committed: Tue Mar 13 14:59:56 2018 -0700

----------------------------------------------------------------------
 .../hbase/tool/LoadIncrementalHFiles.java       | 22 +++++++++++++++++---
 .../TestLoadIncrementalHFilesSplitRecovery.java | 10 ++++-----
 2 files changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a9d9fa35/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
index 3cff047..e6b64b4 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
@@ -132,6 +132,7 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
   private final FsDelegationToken fsDelegationToken;
   private final UserProvider userProvider;
   private final int nrThreads;
+  private AtomicInteger numRetries;
   private final RpcControllerFactory rpcControllerFactory;
 
   private String bulkToken;
@@ -178,6 +179,7 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
     maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 
32);
     nrThreads = conf.getInt("hbase.loadincremental.threads.max",
       Runtime.getRuntime().availableProcessors());
+    numRetries = new AtomicInteger(0);
     rpcControllerFactory = new RpcControllerFactory(conf);
   }
 
@@ -784,8 +786,8 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
   protected List<LoadQueueItem> 
tryAtomicRegionLoad(ClientServiceCallable<byte[]> serviceCallable,
       final TableName tableName, final byte[] first, final 
Collection<LoadQueueItem> lqis)
       throws IOException {
+    List<LoadQueueItem> toRetry = new ArrayList<>();
     try {
-      List<LoadQueueItem> toRetry = new ArrayList<>();
       Configuration conf = getConf();
       byte[] region = RpcRetryingCallerFactory.instantiate(conf, 
null).<byte[]> newCaller()
           .callWithRetries(serviceCallable, Integer.MAX_VALUE);
@@ -799,8 +801,22 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
       return toRetry;
     } catch (IOException e) {
       LOG.error("Encountered unrecoverable error from region server, 
additional details: " +
-          serviceCallable.getExceptionMessageAdditionalDetail(),
-        e);
+                      serviceCallable.getExceptionMessageAdditionalDetail(),
+              e);
+      LOG.warn(
+              "Received a " + e.getClass().getSimpleName()
+                      + " from region server: "
+                      + serviceCallable.getExceptionMessageAdditionalDetail(), 
e);
+      if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false)
+              && numRetries.get() < getConf().getInt(
+              HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+              HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) {
+        LOG.warn("Will attempt to retry loading failed HFiles. Retry #"
+                + numRetries.incrementAndGet());
+        toRetry.addAll(lqis);
+        return toRetry;
+      }
+      LOG.error(RETRY_ON_IO_EXCEPTION + " is disabled. Unable to recover");
       throw e;
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a9d9fa35/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java
index 7e051b3..48a6d23 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java
@@ -331,7 +331,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
   @Test
   public void testRetryOnIOException() throws Exception {
     final TableName table = TableName.valueOf(name.getMethodName());
-    final AtomicInteger calls = new AtomicInteger(1);
+    final AtomicInteger calls = new AtomicInteger(0);
     final Connection conn = 
ConnectionFactory.createConnection(util.getConfiguration());
     util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
     
util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, 
true);
@@ -340,9 +340,8 @@ public class TestLoadIncrementalHFilesSplitRecovery {
       protected List<LoadQueueItem> tryAtomicRegionLoad(
           ClientServiceCallable<byte[]> serverCallable, TableName tableName, 
final byte[] first,
           Collection<LoadQueueItem> lqis) throws IOException {
-        if (calls.getAndIncrement() < util.getConfiguration().getInt(
-          HConstants.HBASE_CLIENT_RETRIES_NUMBER, 
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) -
-            1) {
+        if (calls.get() < util.getConfiguration().getInt(
+          HConstants.HBASE_CLIENT_RETRIES_NUMBER, 
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) {
           ClientServiceCallable<byte[]> newServerCallable = new 
ClientServiceCallable<byte[]>(conn,
               tableName, first, new 
RpcControllerFactory(util.getConfiguration()).newController(),
               HConstants.PRIORITY_UNSET) {
@@ -351,6 +350,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
               throw new IOException("Error calling something on RegionServer");
             }
           };
+          calls.getAndIncrement();
           return super.tryAtomicRegionLoad(newServerCallable, tableName, 
first, lqis);
         } else {
           return super.tryAtomicRegionLoad(serverCallable, tableName, first, 
lqis);
@@ -360,8 +360,8 @@ public class TestLoadIncrementalHFilesSplitRecovery {
     setupTable(conn, table, 10);
     Path dir = buildBulkFiles(table, 1);
     lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table), 
conn.getRegionLocator(table));
+    assertEquals(calls.get(), 2);
     
util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, 
false);
-
   }
 
   private ClusterConnection getMockedConnection(final Configuration conf)

Reply via email to