HBASE-17723 ClientAsyncPrefetchScanner may end prematurely when the size of the cache is one
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0ecb6782 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0ecb6782 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0ecb6782 Branch: refs/heads/hbase-12439 Commit: 0ecb6782593039af75a45c25481f1dbf7cbd6928 Parents: a49bc58 Author: Chia-Ping Tsai <chia7...@gmail.com> Authored: Sun Mar 12 13:48:12 2017 +0800 Committer: Chia-Ping Tsai <chia7...@gmail.com> Committed: Thu Mar 16 03:07:20 2017 +0800 ---------------------------------------------------------------------- .../client/ClientAsyncPrefetchScanner.java | 61 +++++++++++------- .../client/TestScannersFromClientSide.java | 66 ++++++++++++++------ 2 files changed, 88 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/0ecb6782/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java index b1fc2da..007e638 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import com.google.common.annotations.VisibleForTesting; import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; import java.io.IOException; @@ -26,6 +27,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; @@ -62,6 +64,8 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { private AtomicBoolean prefetchRunning; // an attribute for synchronizing close between scanner and prefetch threads private AtomicLong closingThreadId; + // used for testing + private Consumer<Boolean> prefetchListener; private static final int NO_THREAD = -1; public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name, @@ -72,6 +76,11 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { replicaCallTimeoutMicroSecondScan); } + @VisibleForTesting + void setPrefetchListener(Consumer<Boolean> prefetchListener) { + this.prefetchListener = prefetchListener; + } + @Override protected void initCache() { // concurrent cache @@ -88,34 +97,39 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { public Result next() throws IOException { try { - handleException(); + boolean hasExecutedPrefetch = false; + do { + handleException(); - // If the scanner is closed and there's nothing left in the cache, next is a no-op. - if (getCacheCount() == 0 && this.closed) { - return null; - } - if (prefetchCondition()) { - // run prefetch in the background only if no prefetch is already running - if (!isPrefetchRunning()) { - if (prefetchRunning.compareAndSet(false, true)) { - getPool().execute(prefetchRunnable); + // If the scanner is closed and there's nothing left in the cache, next is a no-op. + if (getCacheCount() == 0 && this.closed) { + return null; + } + + if (prefetchCondition()) { + // run prefetch in the background only if no prefetch is already running + if (!isPrefetchRunning()) { + if (prefetchRunning.compareAndSet(false, true)) { + getPool().execute(prefetchRunnable); + hasExecutedPrefetch = true; + } + } + } + + while (isPrefetchRunning()) { + // prefetch running or still pending + if (getCacheCount() > 0) { + return pollCache(); + } else { + // (busy) wait for a record - sleep + Threads.sleep(1); } } - } - while (isPrefetchRunning()) { - // prefetch running or still pending if (getCacheCount() > 0) { return pollCache(); - } else { - // (busy) wait for a record - sleep - Threads.sleep(1); } - } - - if (getCacheCount() > 0) { - return pollCache(); - } + } while (!hasExecutedPrefetch); // if we exhausted this scanner before calling close, write out the scan metrics writeScanMetrics(); @@ -219,11 +233,16 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { @Override public void run() { + boolean succeed = false; try { loadCache(); + succeed = true; } catch (Exception e) { exceptionsQueue.add(e); } finally { + if (prefetchListener != null) { + prefetchListener.accept(succeed); + } prefetchRunning.set(false); if(closed) { if (closingThreadId.compareAndSet(NO_THREAD, Thread.currentThread().getId())) { http://git-wip-us.apache.org/repos/asf/hbase/blob/0ecb6782/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index 6f40093..e5c19ac 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.stream.IntStream; import org.apache.commons.logging.Log; @@ -656,7 +657,9 @@ public class TestScannersFromClientSide { testAsyncScanner(TableName.valueOf(name.getMethodName()), 2, 3, - 10); + 10, + -1, + null); } @Test @@ -664,11 +667,28 @@ public class TestScannersFromClientSide { testAsyncScanner(TableName.valueOf(name.getMethodName()), 30000, 1, - 1); + 1, + -1, + null); + } + + @Test + public void testAsyncScannerWithoutCaching() throws Exception { + testAsyncScanner(TableName.valueOf(name.getMethodName()), + 5, + 1, + 1, + 1, + (b) -> { + try { + TimeUnit.MILLISECONDS.sleep(500); + } catch (InterruptedException ex) { + } + }); } private void testAsyncScanner(TableName table, int rowNumber, int familyNumber, - int qualifierNumber) throws Exception { + int qualifierNumber, int caching, Consumer<Boolean> listener) throws Exception { assert rowNumber > 0; assert familyNumber > 0; assert qualifierNumber > 0; @@ -707,23 +727,33 @@ public class TestScannersFromClientSide { Scan scan = new Scan(); scan.setAsyncPrefetch(true); - ResultScanner scanner = ht.getScanner(scan); - List<Cell> kvListScan = new ArrayList<>(); - Result result; - boolean first = true; - while ((result = scanner.next()) != null) { - // waiting for cache. see HBASE-17376 - if (first) { - TimeUnit.SECONDS.sleep(1); - first = false; - } - for (Cell kv : result.listCells()) { - kvListScan.add(kv); + if (caching > 0) { + scan.setCaching(caching); + } + try (ResultScanner scanner = ht.getScanner(scan)) { + assertTrue("Not instance of async scanner",scanner instanceof ClientAsyncPrefetchScanner); + ((ClientAsyncPrefetchScanner) scanner).setPrefetchListener(listener); + List<Cell> kvListScan = new ArrayList<>(); + Result result; + boolean first = true; + int actualRows = 0; + while ((result = scanner.next()) != null) { + ++actualRows; + // waiting for cache. see HBASE-17376 + if (first) { + TimeUnit.SECONDS.sleep(1); + first = false; + } + for (Cell kv : result.listCells()) { + kvListScan.add(kv); + } } + assertEquals(rowNumber, actualRows); + // These cells may have different rows but it is ok. The Result#getRow + // isn't used in the verifyResult() + result = Result.create(kvListScan); + verifyResult(result, kvListExp, toLog, "Testing async scan"); } - result = Result.create(kvListScan); - assertTrue("Not instance of async scanner",scanner instanceof ClientAsyncPrefetchScanner); - verifyResult(result, kvListExp, toLog, "Testing async scan"); TEST_UTIL.deleteTable(table); }