HBASE-15931 Add log for long-running tasks in AsyncProcess
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/53eb27bb Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/53eb27bb Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/53eb27bb Branch: refs/heads/hbase-12439 Commit: 53eb27bb60bc736f0c56c7a2facfb6e6ccb91be5 Parents: cbb95cd Author: Yu Li <l...@apache.org> Authored: Thu Jun 2 12:00:42 2016 +0800 Committer: Yu Li <l...@apache.org> Committed: Thu Jun 2 12:00:42 2016 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/AsyncProcess.java | 53 ++++++++++++++++---- .../hbase/client/BufferedMutatorImpl.java | 3 +- .../hadoop/hbase/client/TestAsyncProcess.java | 5 +- 3 files changed, 49 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/53eb27bb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 6f7ba59..812e4bf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -120,6 +120,12 @@ class AsyncProcess { */ public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details"; + private final int thresholdToLogUndoneTaskDetails; + private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = + "hbase.client.threshold.log.details"; + private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10; + private final int THRESHOLD_TO_LOG_REGION_DETAILS = 2; + /** * The context used to wait for results from one submit call. * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts), @@ -332,6 +338,10 @@ class AsyncProcess { this.rpcCallerFactory = rpcCaller; this.rpcFactory = rpcFactory; this.logBatchErrorDetails = conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false); + + this.thresholdToLogUndoneTaskDetails = + conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS, + DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS); } /** @@ -389,7 +399,7 @@ class AsyncProcess { List<Integer> locationErrorRows = null; do { // Wait until there is at least one slot for a new task. - waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1); + waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString()); // Remember the previous decisions about regions or region servers we put in the // final multi. @@ -1765,18 +1775,19 @@ class AsyncProcess { @VisibleForTesting /** Waits until all outstanding tasks are done. Used in tests. */ void waitUntilDone() throws InterruptedIOException { - waitForMaximumCurrentTasks(0); + waitForMaximumCurrentTasks(0, null); } /** Wait until the async does not have more than max tasks in progress. */ - private void waitForMaximumCurrentTasks(int max) throws InterruptedIOException { - waitForMaximumCurrentTasks(max, tasksInProgress, id); + private void waitForMaximumCurrentTasks(int max, String tableName) + throws InterruptedIOException { + waitForMaximumCurrentTasks(max, tasksInProgress, id, tableName); } // Break out this method so testable @VisibleForTesting - static void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id) - throws InterruptedIOException { + void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id, + String tableName) throws InterruptedIOException { long lastLog = EnvironmentEdgeManager.currentTime(); long currentInProgress, oldInProgress = Long.MAX_VALUE; while ((currentInProgress = tasksInProgress.get()) > max) { @@ -1785,7 +1796,11 @@ class AsyncProcess { if (now > lastLog + 10000) { lastLog = now; LOG.info("#" + id + ", waiting for some tasks to finish. Expected max=" - + max + ", tasksInProgress=" + currentInProgress); + + max + ", tasksInProgress=" + currentInProgress + + " hasError=" + hasError() + tableName == null ? "" : ", tableName=" + tableName); + if (currentInProgress <= thresholdToLogUndoneTaskDetails) { + logDetailsOfUndoneTasks(currentInProgress); + } } } oldInProgress = currentInProgress; @@ -1802,6 +1817,25 @@ class AsyncProcess { } } + private void logDetailsOfUndoneTasks(long taskInProgress) { + ArrayList<ServerName> servers = new ArrayList<ServerName>(); + for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) { + if (entry.getValue().get() > 0) { + servers.add(entry.getKey()); + } + } + LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers); + if (taskInProgress <= THRESHOLD_TO_LOG_REGION_DETAILS) { + ArrayList<String> regions = new ArrayList<String>(); + for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) { + if (entry.getValue().get() > 0) { + regions.add(Bytes.toString(entry.getKey())); + } + } + LOG.info("Regions against which left over task(s) are processed: " + regions); + } + } + /** * Only used w/useGlobalErrors ctor argument, for HTable backward compat. * @return Whether there were any errors in any request since the last time @@ -1817,12 +1851,13 @@ class AsyncProcess { * failed operations themselves. * @param failedRows an optional list into which the rows that failed since the last time * {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created, are saved. + * @param tableName name of the table * @return all the errors since the last time {@link #waitForAllPreviousOpsAndReset(List)} * was called, or AP was created. */ public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset( - List<Row> failedRows) throws InterruptedIOException { - waitForMaximumCurrentTasks(0); + List<Row> failedRows, String tableName) throws InterruptedIOException { + waitForMaximumCurrentTasks(0, tableName); if (!globalErrors.hasErrors()) { return null; } http://git-wip-us.apache.org/repos/asf/hbase/blob/53eb27bb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 2a7effe..e98ad4e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -238,7 +238,8 @@ public class BufferedMutatorImpl implements BufferedMutator { while (!buffer.isEmpty()) { ap.submit(tableName, buffer, true, null, false); } - RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null); + RetriesExhaustedWithDetailsException error = + ap.waitForAllPreviousOpsAndReset(null, tableName.getNameAsString()); if (error != null) { if (listener == null) { throw error; http://git-wip-us.apache.org/repos/asf/hbase/blob/53eb27bb/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 839a33a..d943316 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -1135,16 +1135,17 @@ public class TestAsyncProcess { } @Test - public void testWaitForMaximumCurrentTasks() throws InterruptedException, BrokenBarrierException { + public void testWaitForMaximumCurrentTasks() throws Exception { final AtomicLong tasks = new AtomicLong(0); final AtomicInteger max = new AtomicInteger(0); final CyclicBarrier barrier = new CyclicBarrier(2); + final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf); Runnable runnable = new Runnable() { @Override public void run() { try { barrier.await(); - AsyncProcess.waitForMaximumCurrentTasks(max.get(), tasks, 1); + ap.waitForMaximumCurrentTasks(max.get(), tasks, 1, null); } catch (InterruptedIOException e) { Assert.fail(e.getMessage()); } catch (InterruptedException e) {