HBASE-15931 Add log for long-running tasks in AsyncProcess

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/53eb27bb
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/53eb27bb
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/53eb27bb

Branch: refs/heads/hbase-12439
Commit: 53eb27bb60bc736f0c56c7a2facfb6e6ccb91be5
Parents: cbb95cd
Author: Yu Li <l...@apache.org>
Authored: Thu Jun 2 12:00:42 2016 +0800
Committer: Yu Li <l...@apache.org>
Committed: Thu Jun 2 12:00:42 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       | 53 ++++++++++++++++----
 .../hbase/client/BufferedMutatorImpl.java       |  3 +-
 .../hadoop/hbase/client/TestAsyncProcess.java   |  5 +-
 3 files changed, 49 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/53eb27bb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 6f7ba59..812e4bf 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -120,6 +120,12 @@ class AsyncProcess {
    */
   public static final String LOG_DETAILS_FOR_BATCH_ERROR = 
"hbase.client.log.batcherrors.details";
 
+  private final int thresholdToLogUndoneTaskDetails;
+  private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
+      "hbase.client.threshold.log.details";
+  private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
+  private final int THRESHOLD_TO_LOG_REGION_DETAILS = 2;
+
   /**
    * The context used to wait for results from one submit call.
    * 1) If AsyncProcess is set to track errors globally, and not per call (for 
HTable puts),
@@ -332,6 +338,10 @@ class AsyncProcess {
     this.rpcCallerFactory = rpcCaller;
     this.rpcFactory = rpcFactory;
     this.logBatchErrorDetails = conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, 
false);
+
+    this.thresholdToLogUndoneTaskDetails =
+        conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
+          DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
   }
 
   /**
@@ -389,7 +399,7 @@ class AsyncProcess {
     List<Integer> locationErrorRows = null;
     do {
       // Wait until there is at least one slot for a new task.
-      waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1);
+      waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, 
tableName.getNameAsString());
 
       // Remember the previous decisions about regions or region servers we 
put in the
       //  final multi.
@@ -1765,18 +1775,19 @@ class AsyncProcess {
   @VisibleForTesting
   /** Waits until all outstanding tasks are done. Used in tests. */
   void waitUntilDone() throws InterruptedIOException {
-    waitForMaximumCurrentTasks(0);
+    waitForMaximumCurrentTasks(0, null);
   }
 
   /** Wait until the async does not have more than max tasks in progress. */
-  private void waitForMaximumCurrentTasks(int max) throws 
InterruptedIOException {
-    waitForMaximumCurrentTasks(max, tasksInProgress, id);
+  private void waitForMaximumCurrentTasks(int max, String tableName)
+      throws InterruptedIOException {
+    waitForMaximumCurrentTasks(max, tasksInProgress, id, tableName);
   }
 
   // Break out this method so testable
   @VisibleForTesting
-  static void waitForMaximumCurrentTasks(int max, final AtomicLong 
tasksInProgress, final long id)
-  throws InterruptedIOException {
+  void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, 
final long id,
+      String tableName) throws InterruptedIOException {
     long lastLog = EnvironmentEdgeManager.currentTime();
     long currentInProgress, oldInProgress = Long.MAX_VALUE;
     while ((currentInProgress = tasksInProgress.get()) > max) {
@@ -1785,7 +1796,11 @@ class AsyncProcess {
         if (now > lastLog + 10000) {
           lastLog = now;
           LOG.info("#" + id + ", waiting for some tasks to finish. Expected 
max="
-              + max + ", tasksInProgress=" + currentInProgress);
+              + max + ", tasksInProgress=" + currentInProgress +
+              " hasError=" + hasError() + tableName == null ? "" : ", 
tableName=" + tableName);
+          if (currentInProgress <= thresholdToLogUndoneTaskDetails) {
+            logDetailsOfUndoneTasks(currentInProgress);
+          }
         }
       }
       oldInProgress = currentInProgress;
@@ -1802,6 +1817,25 @@ class AsyncProcess {
     }
   }
 
+  private void logDetailsOfUndoneTasks(long taskInProgress) {
+    ArrayList<ServerName> servers = new ArrayList<ServerName>();
+    for (Map.Entry<ServerName, AtomicInteger> entry : 
taskCounterPerServer.entrySet()) {
+      if (entry.getValue().get() > 0) {
+        servers.add(entry.getKey());
+      }
+    }
+    LOG.info("Left over " + taskInProgress + " task(s) are processed on 
server(s): " + servers);
+    if (taskInProgress <= THRESHOLD_TO_LOG_REGION_DETAILS) {
+      ArrayList<String> regions = new ArrayList<String>();
+      for (Map.Entry<byte[], AtomicInteger> entry : 
taskCounterPerRegion.entrySet()) {
+        if (entry.getValue().get() > 0) {
+          regions.add(Bytes.toString(entry.getKey()));
+        }
+      }
+      LOG.info("Regions against which left over task(s) are processed: " + 
regions);
+    }
+  }
+
   /**
    * Only used w/useGlobalErrors ctor argument, for HTable backward compat.
    * @return Whether there were any errors in any request since the last time
@@ -1817,12 +1851,13 @@ class AsyncProcess {
    * failed operations themselves.
    * @param failedRows an optional list into which the rows that failed since 
the last time
    *        {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was 
created, are saved.
+   * @param tableName name of the table
    * @return all the errors since the last time {@link 
#waitForAllPreviousOpsAndReset(List)}
    *          was called, or AP was created.
    */
   public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(
-      List<Row> failedRows) throws InterruptedIOException {
-    waitForMaximumCurrentTasks(0);
+      List<Row> failedRows, String tableName) throws InterruptedIOException {
+    waitForMaximumCurrentTasks(0, tableName);
     if (!globalErrors.hasErrors()) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/53eb27bb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index 2a7effe..e98ad4e 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -238,7 +238,8 @@ public class BufferedMutatorImpl implements BufferedMutator 
{
         while (!buffer.isEmpty()) {
           ap.submit(tableName, buffer, true, null, false);
         }
-        RetriesExhaustedWithDetailsException error = 
ap.waitForAllPreviousOpsAndReset(null);
+        RetriesExhaustedWithDetailsException error =
+            ap.waitForAllPreviousOpsAndReset(null, 
tableName.getNameAsString());
         if (error != null) {
           if (listener == null) {
             throw error;

http://git-wip-us.apache.org/repos/asf/hbase/blob/53eb27bb/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 839a33a..d943316 100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -1135,16 +1135,17 @@ public class TestAsyncProcess {
   }
 
   @Test
-  public void testWaitForMaximumCurrentTasks() throws InterruptedException, 
BrokenBarrierException {
+  public void testWaitForMaximumCurrentTasks() throws Exception {
     final AtomicLong tasks = new AtomicLong(0);
     final AtomicInteger max = new AtomicInteger(0);
     final CyclicBarrier barrier = new CyclicBarrier(2);
+    final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf);
     Runnable runnable = new Runnable() {
       @Override
       public void run() {
         try {
           barrier.await();
-          AsyncProcess.waitForMaximumCurrentTasks(max.get(), tasks, 1);
+          ap.waitForMaximumCurrentTasks(max.get(), tasks, 1, null);
         } catch (InterruptedIOException e) {
           Assert.fail(e.getMessage());
         } catch (InterruptedException e) {

Reply via email to