HBASE-17723 ClientAsyncPrefetchScanner may end prematurely when the size of the 
cache is one


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0ecb6782
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0ecb6782
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0ecb6782

Branch: refs/heads/hbase-12439
Commit: 0ecb6782593039af75a45c25481f1dbf7cbd6928
Parents: a49bc58
Author: Chia-Ping Tsai <chia7...@gmail.com>
Authored: Sun Mar 12 13:48:12 2017 +0800
Committer: Chia-Ping Tsai <chia7...@gmail.com>
Committed: Thu Mar 16 03:07:20 2017 +0800

----------------------------------------------------------------------
 .../client/ClientAsyncPrefetchScanner.java      | 61 +++++++++++-------
 .../client/TestScannersFromClientSide.java      | 66 ++++++++++++++------
 2 files changed, 88 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0ecb6782/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
index b1fc2da..007e638 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import com.google.common.annotations.VisibleForTesting;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
 
 import java.io.IOException;
@@ -26,6 +27,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
@@ -62,6 +64,8 @@ public class ClientAsyncPrefetchScanner extends 
ClientSimpleScanner {
   private AtomicBoolean prefetchRunning;
   // an attribute for synchronizing close between scanner and prefetch threads
   private AtomicLong closingThreadId;
+  // used for testing
+  private Consumer<Boolean> prefetchListener;
   private static final int NO_THREAD = -1;
 
   public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, 
TableName name,
@@ -72,6 +76,11 @@ public class ClientAsyncPrefetchScanner extends 
ClientSimpleScanner {
         replicaCallTimeoutMicroSecondScan);
   }
 
+  @VisibleForTesting
+  void setPrefetchListener(Consumer<Boolean> prefetchListener) {
+    this.prefetchListener = prefetchListener;
+  }
+
   @Override
   protected void initCache() {
     // concurrent cache
@@ -88,34 +97,39 @@ public class ClientAsyncPrefetchScanner extends 
ClientSimpleScanner {
   public Result next() throws IOException {
 
     try {
-      handleException();
+      boolean hasExecutedPrefetch = false;
+      do {
+        handleException();
 
-      // If the scanner is closed and there's nothing left in the cache, next 
is a no-op.
-      if (getCacheCount() == 0 && this.closed) {
-        return null;
-      }
-      if (prefetchCondition()) {
-        // run prefetch in the background only if no prefetch is already 
running
-        if (!isPrefetchRunning()) {
-          if (prefetchRunning.compareAndSet(false, true)) {
-            getPool().execute(prefetchRunnable);
+        // If the scanner is closed and there's nothing left in the cache, 
next is a no-op.
+        if (getCacheCount() == 0 && this.closed) {
+          return null;
+        }
+
+        if (prefetchCondition()) {
+          // run prefetch in the background only if no prefetch is already 
running
+          if (!isPrefetchRunning()) {
+            if (prefetchRunning.compareAndSet(false, true)) {
+              getPool().execute(prefetchRunnable);
+              hasExecutedPrefetch = true;
+            }
+          }
+        }
+
+        while (isPrefetchRunning()) {
+          // prefetch running or still pending
+          if (getCacheCount() > 0) {
+            return pollCache();
+          } else {
+            // (busy) wait for a record - sleep
+            Threads.sleep(1);
           }
         }
-      }
 
-      while (isPrefetchRunning()) {
-        // prefetch running or still pending
         if (getCacheCount() > 0) {
           return pollCache();
-        } else {
-          // (busy) wait for a record - sleep
-          Threads.sleep(1);
         }
-      }
-
-      if (getCacheCount() > 0) {
-        return pollCache();
-      }
+      } while (!hasExecutedPrefetch);
 
       // if we exhausted this scanner before calling close, write out the scan 
metrics
       writeScanMetrics();
@@ -219,11 +233,16 @@ public class ClientAsyncPrefetchScanner extends 
ClientSimpleScanner {
 
     @Override
     public void run() {
+      boolean succeed = false;
       try {
         loadCache();
+        succeed = true;
       } catch (Exception e) {
         exceptionsQueue.add(e);
       } finally {
+        if (prefetchListener != null) {
+          prefetchListener.accept(succeed);
+        }
         prefetchRunning.set(false);
         if(closed) {
           if (closingThreadId.compareAndSet(NO_THREAD, 
Thread.currentThread().getId())) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ecb6782/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
index 6f40093..e5c19ac 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import java.util.stream.IntStream;
 
 import org.apache.commons.logging.Log;
@@ -656,7 +657,9 @@ public class TestScannersFromClientSide {
     testAsyncScanner(TableName.valueOf(name.getMethodName()),
       2,
       3,
-      10);
+      10,
+      -1,
+      null);
   }
 
   @Test
@@ -664,11 +667,28 @@ public class TestScannersFromClientSide {
     testAsyncScanner(TableName.valueOf(name.getMethodName()),
       30000,
       1,
-      1);
+      1,
+      -1,
+      null);
+  }
+
+  @Test
+  public void testAsyncScannerWithoutCaching() throws Exception {
+    testAsyncScanner(TableName.valueOf(name.getMethodName()),
+      5,
+      1,
+      1,
+      1,
+      (b) -> {
+        try {
+          TimeUnit.MILLISECONDS.sleep(500);
+        } catch (InterruptedException ex) {
+        }
+      });
   }
 
   private void testAsyncScanner(TableName table, int rowNumber, int 
familyNumber,
-      int qualifierNumber) throws Exception {
+      int qualifierNumber, int caching, Consumer<Boolean> listener) throws 
Exception {
     assert rowNumber > 0;
     assert familyNumber > 0;
     assert qualifierNumber > 0;
@@ -707,23 +727,33 @@ public class TestScannersFromClientSide {
 
     Scan scan = new Scan();
     scan.setAsyncPrefetch(true);
-    ResultScanner scanner = ht.getScanner(scan);
-    List<Cell> kvListScan = new ArrayList<>();
-    Result result;
-    boolean first = true;
-    while ((result = scanner.next()) != null) {
-      // waiting for cache. see HBASE-17376
-      if (first) {
-        TimeUnit.SECONDS.sleep(1);
-        first = false;
-      }
-      for (Cell kv : result.listCells()) {
-        kvListScan.add(kv);
+    if (caching > 0) {
+      scan.setCaching(caching);
+    }
+    try (ResultScanner scanner = ht.getScanner(scan)) {
+      assertTrue("Not instance of async scanner",scanner instanceof 
ClientAsyncPrefetchScanner);
+      ((ClientAsyncPrefetchScanner) scanner).setPrefetchListener(listener);
+      List<Cell> kvListScan = new ArrayList<>();
+      Result result;
+      boolean first = true;
+      int actualRows = 0;
+      while ((result = scanner.next()) != null) {
+        ++actualRows;
+        // waiting for cache. see HBASE-17376
+        if (first) {
+          TimeUnit.SECONDS.sleep(1);
+          first = false;
+        }
+        for (Cell kv : result.listCells()) {
+          kvListScan.add(kv);
+        }
       }
+      assertEquals(rowNumber, actualRows);
+      // These cells may have different rows but it is ok. The Result#getRow
+      // isn't used in the verifyResult()
+      result = Result.create(kvListScan);
+      verifyResult(result, kvListExp, toLog, "Testing async scan");
     }
-    result = Result.create(kvListScan);
-    assertTrue("Not instance of async scanner",scanner instanceof 
ClientAsyncPrefetchScanner);
-    verifyResult(result, kvListExp, toLog, "Testing async scan");
     TEST_UTIL.deleteTable(table);
   }
 

Reply via email to