Repository: hbase Updated Branches: refs/heads/branch-2 3757915da -> a9d9fa35a
HBASE-17165 Make use of retry setting in LoadIncrementalHFiles & fix test Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a9d9fa35 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a9d9fa35 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a9d9fa35 Branch: refs/heads/branch-2 Commit: a9d9fa35a2d99e91b16b05afb40d5be5dd0c98ec Parents: 3757915 Author: Mike Grimes <grime...@amazon.com> Authored: Fri Nov 17 19:47:54 2017 -0800 Committer: Michael Stack <st...@apache.org> Committed: Tue Mar 13 14:59:56 2018 -0700 ---------------------------------------------------------------------- .../hbase/tool/LoadIncrementalHFiles.java | 22 +++++++++++++++++--- .../TestLoadIncrementalHFilesSplitRecovery.java | 10 ++++----- 2 files changed, 24 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/a9d9fa35/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java index 3cff047..e6b64b4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java @@ -132,6 +132,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private final FsDelegationToken fsDelegationToken; private final UserProvider userProvider; private final int nrThreads; + private AtomicInteger numRetries; private final RpcControllerFactory rpcControllerFactory; private String bulkToken; @@ -178,6 +179,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32); nrThreads = conf.getInt("hbase.loadincremental.threads.max", Runtime.getRuntime().availableProcessors()); + numRetries = new AtomicInteger(0); rpcControllerFactory = new RpcControllerFactory(conf); } @@ -784,8 +786,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { protected List<LoadQueueItem> tryAtomicRegionLoad(ClientServiceCallable<byte[]> serviceCallable, final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis) throws IOException { + List<LoadQueueItem> toRetry = new ArrayList<>(); try { - List<LoadQueueItem> toRetry = new ArrayList<>(); Configuration conf = getConf(); byte[] region = RpcRetryingCallerFactory.instantiate(conf, null).<byte[]> newCaller() .callWithRetries(serviceCallable, Integer.MAX_VALUE); @@ -799,8 +801,22 @@ public class LoadIncrementalHFiles extends Configured implements Tool { return toRetry; } catch (IOException e) { LOG.error("Encountered unrecoverable error from region server, additional details: " + - serviceCallable.getExceptionMessageAdditionalDetail(), - e); + serviceCallable.getExceptionMessageAdditionalDetail(), + e); + LOG.warn( + "Received a " + e.getClass().getSimpleName() + + " from region server: " + + serviceCallable.getExceptionMessageAdditionalDetail(), e); + if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) + && numRetries.get() < getConf().getInt( + HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) { + LOG.warn("Will attempt to retry loading failed HFiles. Retry #" + + numRetries.incrementAndGet()); + toRetry.addAll(lqis); + return toRetry; + } + LOG.error(RETRY_ON_IO_EXCEPTION + " is disabled. Unable to recover"); throw e; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/a9d9fa35/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java index 7e051b3..48a6d23 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java @@ -331,7 +331,7 @@ public class TestLoadIncrementalHFilesSplitRecovery { @Test public void testRetryOnIOException() throws Exception { final TableName table = TableName.valueOf(name.getMethodName()); - final AtomicInteger calls = new AtomicInteger(1); + final AtomicInteger calls = new AtomicInteger(0); final Connection conn = ConnectionFactory.createConnection(util.getConfiguration()); util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true); @@ -340,9 +340,8 @@ public class TestLoadIncrementalHFilesSplitRecovery { protected List<LoadQueueItem> tryAtomicRegionLoad( ClientServiceCallable<byte[]> serverCallable, TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis) throws IOException { - if (calls.getAndIncrement() < util.getConfiguration().getInt( - HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) - - 1) { + if (calls.get() < util.getConfiguration().getInt( + HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) { ClientServiceCallable<byte[]> newServerCallable = new ClientServiceCallable<byte[]>(conn, tableName, first, new RpcControllerFactory(util.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) { @@ -351,6 +350,7 @@ public class TestLoadIncrementalHFilesSplitRecovery { throw new IOException("Error calling something on RegionServer"); } }; + calls.getAndIncrement(); return super.tryAtomicRegionLoad(newServerCallable, tableName, first, lqis); } else { return super.tryAtomicRegionLoad(serverCallable, tableName, first, lqis); @@ -360,8 +360,8 @@ public class TestLoadIncrementalHFilesSplitRecovery { setupTable(conn, table, 10); Path dir = buildBulkFiles(table, 1); lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table), conn.getRegionLocator(table)); + assertEquals(calls.get(), 2); util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false); - } private ClusterConnection getMockedConnection(final Configuration conf)