HBASE-17740 Correct the semantic of batch and partial for async client

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1849e8a5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1849e8a5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1849e8a5

Branch: refs/heads/hbase-12439
Commit: 1849e8a5a77373b5fb8e354c3f20214a80eb8c1a
Parents: 0ecb678
Author: zhangduo <zhang...@apache.org>
Authored: Wed Mar 15 18:26:51 2017 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Thu Mar 16 09:44:23 2017 +0800

----------------------------------------------------------------------
 .../client/AllowPartialScanResultCache.java     |  31 ++-
 .../hadoop/hbase/client/AsyncClientScanner.java |   4 +-
 .../hbase/client/BatchScanResultCache.java      | 142 +++++++++++
 .../hadoop/hbase/client/ClientScanner.java      | 253 +------------------
 .../hadoop/hbase/client/ConnectionUtils.java    |  14 +
 .../org/apache/hadoop/hbase/client/Result.java  |  72 +++---
 .../client/TestAllowPartialScanResultCache.java |  33 ++-
 .../hbase/client/TestBatchScanResultCache.java  | 113 +++++++++
 .../TestCompleteResultScanResultCache.java      |   5 +-
 .../client/TestRawAsyncTablePartialScan.java    | 119 +++++++++
 10 files changed, 471 insertions(+), 315 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
index caecfd4..82f1ea0 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells;
+
 import java.io.IOException;
 import java.util.Arrays;
 
@@ -36,10 +38,6 @@ class AllowPartialScanResultCache implements ScanResultCache 
{
   // beginning of a row when retry.
   private Cell lastCell;
 
-  private Result filterCells(Result result) {
-    return lastCell == null ? result : ConnectionUtils.filterCells(result, 
lastCell);
-  }
-
   private void updateLastCell(Result result) {
     lastCell = result.rawCells()[result.rawCells().length - 1];
   }
@@ -49,22 +47,23 @@ class AllowPartialScanResultCache implements 
ScanResultCache {
     if (results.length == 0) {
       return EMPTY_RESULT_ARRAY;
     }
-    Result first = filterCells(results[0]);
-    if (results.length == 1) {
-      if (first == null) {
-        // do not update last cell if we filter out all cells
-        return EMPTY_RESULT_ARRAY;
+    int i;
+    for (i = 0; i < results.length; i++) {
+      Result r = filterCells(results[i], lastCell);
+      if (r != null) {
+        results[i] = r;
+        break;
       }
-      updateLastCell(results[0]);
-      results[0] = first;
-      return results;
+    }
+    if (i == results.length) {
+      return EMPTY_RESULT_ARRAY;
     }
     updateLastCell(results[results.length - 1]);
-    if (first == null) {
-      return Arrays.copyOfRange(results, 1, results.length);
+    if (i > 0) {
+      return Arrays.copyOfRange(results, i, results.length);
+    } else {
+      return results;
     }
-    results[0] = first;
-    return results;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
index 2215d36..fa7aa81 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
 import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
+import static 
org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType;
 
 import java.io.IOException;
@@ -86,8 +87,7 @@ class AsyncClientScanner {
     this.scanTimeoutNs = scanTimeoutNs;
     this.rpcTimeoutNs = rpcTimeoutNs;
     this.startLogErrorsCnt = startLogErrorsCnt;
-    this.resultCache = scan.getAllowPartialResults() || scan.getBatch() > 0
-        ? new AllowPartialScanResultCache() : new CompleteScanResultCache();
+    this.resultCache = createScanResultCache(scan);
   }
 
   private static final class OpenScannerResponse {

http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java
new file mode 100644
index 0000000..9ab959b
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * A scan result cache for batched scan, i.e,
+ * {@code scan.getBatch() > 0 && !scan.getAllowPartialResults()}.
+ * <p>
+ * If user setBatch(5) and rpc returns 3+5+5+5+3 cells, we should return 
5+5+5+5+1 to user. setBatch
+ * doesn't mean setAllowPartialResult(true).
+ */
+@InterfaceAudience.Private
+public class BatchScanResultCache implements ScanResultCache {
+
+  private final int batch;
+
+  // used to filter out the cells that already returned to user as we always 
start from the
+  // beginning of a row when retry.
+  private Cell lastCell;
+
+  private final Deque<Result> partialResults = new ArrayDeque<>();
+
+  private int numCellsOfPartialResults;
+
+  public BatchScanResultCache(int batch) {
+    this.batch = batch;
+  }
+
+  private void updateLastCell(Result result) {
+    lastCell = result.rawCells()[result.rawCells().length - 1];
+  }
+
+  private Result createCompletedResult() throws IOException {
+    Result result = Result.createCompleteResult(partialResults);
+    partialResults.clear();
+    numCellsOfPartialResults = 0;
+    return result;
+  }
+
+  // Add new result to the partial list and return a batched Result if caching 
size exceed batching
+  // limit. As the RS will also respect the scan.getBatch, we can make sure 
that we will get only
+  // one Result back at most(or null, which means we do not have enough cells).
+  private Result regroupResults(Result result) {
+    partialResults.addLast(result);
+    numCellsOfPartialResults += result.size();
+    if (numCellsOfPartialResults < batch) {
+      return null;
+    }
+    Cell[] cells = new Cell[batch];
+    int cellCount = 0;
+    boolean stale = false;
+    for (;;) {
+      Result r = partialResults.pollFirst();
+      stale = stale || r.isStale();
+      int newCellCount = cellCount + r.size();
+      if (newCellCount > batch) {
+        // We have more cells than expected, so split the current result
+        int len = batch - cellCount;
+        System.arraycopy(r.rawCells(), 0, cells, cellCount, len);
+        Cell[] remainingCells = new Cell[r.size() - len];
+        System.arraycopy(r.rawCells(), len, remainingCells, 0, r.size() - len);
+        partialResults.addFirst(
+          Result.create(remainingCells, r.getExists(), r.isStale(), 
r.mayHaveMoreCellsInRow()));
+        break;
+      }
+      System.arraycopy(r.rawCells(), 0, cells, cellCount, r.size());
+      if (newCellCount == batch) {
+        break;
+      }
+      cellCount = newCellCount;
+    }
+    numCellsOfPartialResults -= batch;
+    return Result.create(cells, null, stale,
+      result.mayHaveMoreCellsInRow() || !partialResults.isEmpty());
+  }
+
+  @Override
+  public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) 
throws IOException {
+    if (results.length == 0) {
+      if (!partialResults.isEmpty() && !isHeartbeatMessage) {
+        return new Result[] { createCompletedResult() };
+      }
+      return EMPTY_RESULT_ARRAY;
+    }
+    List<Result> regroupedResults = new ArrayList<>();
+    for (Result result : results) {
+      result = filterCells(result, lastCell);
+      if (result == null) {
+        continue;
+      }
+      // check if we have a row change
+      if (!partialResults.isEmpty() &&
+          !Bytes.equals(partialResults.peek().getRow(), result.getRow())) {
+        regroupedResults.add(createCompletedResult());
+      }
+      Result regroupedResult = regroupResults(result);
+      if (regroupedResult != null) {
+        regroupedResults.add(regroupedResult);
+        // only update last cell when we actually return it to user.
+        updateLastCell(regroupedResult);
+      }
+      if (!result.mayHaveMoreCellsInRow() && !partialResults.isEmpty()) {
+        // We are done for this row
+        regroupedResults.add(createCompletedResult());
+      }
+    }
+    return regroupedResults.toArray(new Result[0]);
+  }
+
+  @Override
+  public void clear() {
+    partialResults.clear();
+    numCellsOfPartialResults = 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index bd3d4ef..a8b029f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -18,16 +18,15 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
+import static 
org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
 import static 
org.apache.hadoop.hbase.client.ConnectionUtils.numberOfIndividualRows;
 
 import com.google.common.annotations.VisibleForTesting;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 
@@ -35,8 +34,6 @@ import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
@@ -69,24 +66,7 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
   protected HRegionInfo currentRegion = null;
   protected ScannerCallableWithReplicas callable = null;
   protected Queue<Result> cache;
-  /**
-   * A list of partial results that have been returned from the server. This 
list should only
-   * contain results if this scanner does not have enough partial results to 
form the complete
-   * result.
-   */
-  protected int partialResultsCellSizes = 0;
-  protected final LinkedList<Result> partialResults = new LinkedList<>();
-
-  /**
-   * The row for which we are accumulating partial Results (i.e. the row of 
the Results stored
-   * inside partialResults). Changes to partialResultsRow and partialResults 
are kept in sync via
-   * the methods {@link #regroupResults(Result)} and {@link 
#clearPartialResults()}
-   */
-  protected byte[] partialResultsRow = null;
-  /**
-   * The last cell from a not full Row which is added to cache
-   */
-  protected Cell lastCellLoadedToCache = null;
+  private final ScanResultCache scanResultCache;
   protected final int caching;
   protected long lastNext;
   // Keep lastResult returned successfully in case we have to reset scanner.
@@ -159,6 +139,8 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
     this.rpcControllerFactory = controllerFactory;
 
     this.conf = conf;
+
+    this.scanResultCache = createScanResultCache(scan);
     initCache();
   }
 
@@ -356,14 +338,7 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
 
   private void closeScannerIfExhausted(boolean exhausted) throws IOException {
     if (exhausted) {
-      if (!partialResults.isEmpty()) {
-        // XXX: continue if there are partial results. But in fact server 
should not set
-        // hasMoreResults to false if there are partial results.
-        LOG.warn("Server tells us there is no more results for this region but 
we still have" +
-            " partialResults, this should not happen, retry on the current 
scanner anyway");
-      } else {
-        closeScanner();
-      }
+      closeScanner();
     }
   }
 
@@ -371,7 +346,7 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
       MutableBoolean retryAfterOutOfOrderException, int retriesLeft) throws 
DoNotRetryIOException {
     // An exception was thrown which makes any partial results that we were 
collecting
     // invalid. The scanner will need to be reset to the beginning of a row.
-    clearPartialResults();
+    scanResultCache.clear();
 
     // Unfortunately, DNRIOE is used in two different semantics.
     // (1) The first is to close the client scanner and bubble up the 
exception all the way
@@ -465,7 +440,7 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
         if (callable.switchedToADifferentReplica()) {
           // Any accumulated partial results are no longer valid since the 
callable will
           // openScanner with the correct startkey and we must pick up from 
there
-          clearPartialResults();
+          scanResultCache.clear();
           this.currentRegion = callable.getHRegionInfo();
         }
         retryAfterOutOfOrderException.setValue(true);
@@ -485,29 +460,19 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
       // Groom the array of Results that we received back from the server 
before adding that
       // Results to the scanner's cache. If partial results are not allowed to 
be seen by the
       // caller, all book keeping will be performed within this method.
-      List<Result> resultsToAddToCache =
-          getResultsToAddToCache(values, callable.isHeartbeatMessage());
-      if (!resultsToAddToCache.isEmpty()) {
+      Result[] resultsToAddToCache = scanResultCache.addAndGet(values, 
callable.isHeartbeatMessage());
+      if (resultsToAddToCache.length > 0) {
         for (Result rs : resultsToAddToCache) {
-          rs = filterLoadedCell(rs);
-          if (rs == null) {
-            continue;
-          }
-
           cache.add(rs);
           long estimatedHeapSizeOfResult = calcEstimatedSize(rs);
           countdown--;
           remainingResultSize -= estimatedHeapSizeOfResult;
           addEstimatedSize(estimatedHeapSizeOfResult);
           this.lastResult = rs;
-          if (this.lastResult.mayHaveMoreCellsInRow()) {
-            updateLastCellLoadedToCache(this.lastResult);
-          } else {
-            this.lastCellLoadedToCache = null;
-          }
         }
-        if (scan.getLimit() > 0 && !resultsToAddToCache.isEmpty()) {
-          int newLimit = scan.getLimit() - 
numberOfIndividualRows(resultsToAddToCache);
+        if (scan.getLimit() > 0) {
+          int newLimit =
+              scan.getLimit() - 
numberOfIndividualRows(Arrays.asList(resultsToAddToCache));
           assert newLimit >= 0;
           scan.setLimit(newLimit);
         }
@@ -550,13 +515,6 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
       }
       // we are done with the current region
       if (regionExhausted) {
-        if (!partialResults.isEmpty()) {
-          // XXX: continue if there are partial results. But in fact server 
should not set
-          // hasMoreResults to false if there are partial results.
-          LOG.warn("Server tells us there is no more results for this region 
but we still have" +
-              " partialResults, this should not happen, retry on the current 
scanner anyway");
-          continue;
-        }
         if (!moveToNextRegion()) {
           break;
         }
@@ -573,142 +531,6 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
     return cache != null ? cache.size() : 0;
   }
 
-  /**
-   * This method ensures all of our book keeping regarding partial results is 
kept up to date. This
-   * method should be called once we know that the results we received back 
from the RPC request do
-   * not contain errors. We return a list of results that should be added to 
the cache. In general,
-   * this list will contain all NON-partial results from the input array 
(unless the client has
-   * specified that they are okay with receiving partial results)
-   * @param resultsFromServer The array of {@link Result}s returned from the 
server
-   * @param heartbeatMessage Flag indicating whether or not the response 
received from the server
-   *          represented a complete response, or a heartbeat message that was 
sent to keep the
-   *          client-server connection alive
-   * @return the list of results that should be added to the cache.
-   * @throws IOException
-   */
-  protected List<Result> getResultsToAddToCache(Result[] resultsFromServer,
-      boolean heartbeatMessage) throws IOException {
-    int resultSize = resultsFromServer != null ? resultsFromServer.length : 0;
-    List<Result> resultsToAddToCache = new ArrayList<>(resultSize);
-
-    // If the caller has indicated in their scan that they are okay with 
seeing partial results,
-    // then simply add all results to the list. Note allowPartial and setBatch 
are not same, we can
-    // return here if allow partials and we will handle batching later.
-    if (scan.getAllowPartialResults()) {
-      addResultsToList(resultsToAddToCache, resultsFromServer, 0,
-        (null == resultsFromServer ? 0 : resultsFromServer.length));
-      return resultsToAddToCache;
-    }
-
-    // If no results were returned it indicates that either we have the all 
the partial results
-    // necessary to construct the complete result or the server had to send a 
heartbeat message
-    // to the client to keep the client-server connection alive
-    if (resultsFromServer == null || resultsFromServer.length == 0) {
-      // If this response was an empty heartbeat message, then we have not 
exhausted the region
-      // and thus there may be more partials server side that still need to be 
added to the partial
-      // list before we form the complete Result
-      if (!partialResults.isEmpty() && !heartbeatMessage) {
-        resultsToAddToCache.add(Result.createCompleteResult(partialResults));
-        clearPartialResults();
-      }
-
-      return resultsToAddToCache;
-    }
-
-    for(Result result : resultsFromServer) {
-      if (partialResultsRow != null && Bytes.compareTo(result.getRow(), 
partialResultsRow) != 0) {
-        // We have a new row, complete the previous row.
-        resultsToAddToCache.add(Result.createCompleteResult(partialResults));
-        clearPartialResults();
-      }
-      Result res = regroupResults(result);
-      if (res != null) {
-        resultsToAddToCache.add(res);
-      }
-      if (!result.mayHaveMoreCellsInRow()) {
-        // We are done for this row
-        if (partialResultsCellSizes > 0) {
-          resultsToAddToCache.add(Result.createCompleteResult(partialResults));
-        }
-        clearPartialResults();
-      }
-    }
-
-
-    return resultsToAddToCache;
-  }
-
-  /**
-   * Add new result to the partial list and return a batched Result if caching 
size exceed
-   * batching limit.
-   * If user setBatch(5) and rpc returns 3+5+5+5+3 cells, we should return 
5+5+5+5+1 to user.
-   * setBatch doesn't mean setAllowPartialResult(true)
-   * @param result The result that we want to add to our list of partial 
Results
-   * @return the result if we have batch limit and there is one Result can be 
returned to user, or
-   *         null if we have not.
-   * @throws IOException
-   */
-  private Result regroupResults(final Result result) throws IOException {
-    partialResultsRow = result.getRow();
-    partialResults.add(result);
-    partialResultsCellSizes += result.size();
-    if (scan.getBatch() > 0 && partialResultsCellSizes >= scan.getBatch()) {
-      Cell[] cells = new Cell[scan.getBatch()];
-      int count = 0;
-      boolean stale = false;
-      while (count < scan.getBatch()) {
-        Result res = partialResults.poll();
-        stale = stale || res.isStale();
-        if (res.size() + count <= scan.getBatch()) {
-          System.arraycopy(res.rawCells(), 0, cells, count, res.size());
-          count += res.size();
-        } else {
-          int len = scan.getBatch() - count;
-          System.arraycopy(res.rawCells(), 0, cells, count, len);
-          Cell[] remainingCells = new Cell[res.size() - len];
-          System.arraycopy(res.rawCells(), len, remainingCells, 0, res.size() 
- len);
-          Result remainingRes = Result.create(remainingCells, res.getExists(), 
res.isStale(),
-              res.mayHaveMoreCellsInRow());
-          partialResults.addFirst(remainingRes);
-          count = scan.getBatch();
-        }
-      }
-      partialResultsCellSizes -= scan.getBatch();
-      if (partialResultsCellSizes == 0) {
-        // We have nothing in partialResults, clear the flags to prevent 
returning empty Result
-        // when next result belongs to the next row.
-        clearPartialResults();
-      }
-      return Result.create(cells, null, stale,
-          partialResultsCellSizes > 0 || result.mayHaveMoreCellsInRow());
-    }
-    return null;
-  }
-
-  /**
-   * Convenience method for clearing the list of partials and resetting the 
partialResultsRow.
-   */
-  private void clearPartialResults() {
-    partialResults.clear();
-    partialResultsCellSizes = 0;
-    partialResultsRow = null;
-  }
-
-  /**
-   * Helper method for adding results between the indices [start, end) to the 
outputList
-   * @param outputList the list that results will be added to
-   * @param inputArray the array that results are taken from
-   * @param start beginning index (inclusive)
-   * @param end ending index (exclusive)
-   */
-  private void addResultsToList(List<Result> outputList, Result[] inputArray, 
int start, int end) {
-    if (inputArray == null || start < 0 || end > inputArray.length) return;
-
-    for (int i = start; i < end; i++) {
-      outputList.add(inputArray[i]);
-    }
-  }
-
   @Override
   public void close() {
     if (!scanMetricsPublished) writeScanMetrics();
@@ -749,57 +571,6 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
     return false;
   }
 
-  protected void updateLastCellLoadedToCache(Result result) {
-    if (result.rawCells().length == 0) {
-      return;
-    }
-    this.lastCellLoadedToCache = result.rawCells()[result.rawCells().length - 
1];
-  }
-
-  /**
-   * Compare two Cells considering reversed scanner. ReversedScanner only 
reverses rows, not
-   * columns.
-   */
-  private int compare(Cell a, Cell b) {
-    CellComparator comparator = currentRegion != null && 
currentRegion.isMetaRegion()
-        ? CellComparator.META_COMPARATOR : CellComparator.COMPARATOR;
-    int r = comparator.compareRows(a, b);
-    if (r != 0) {
-      return this.scan.isReversed() ? -r : r;
-    }
-    return CellComparator.compareWithoutRow(a, b);
-  }
-
-  private Result filterLoadedCell(Result result) {
-    // we only filter result when last result is partial
-    // so lastCellLoadedToCache and result should have same row key.
-    // However, if 1) read some cells; 1.1) delete this row at the same time 
2) move region;
-    // 3) read more cell. lastCellLoadedToCache and result will be not at same 
row.
-    if (lastCellLoadedToCache == null || result.rawCells().length == 0) {
-      return result;
-    }
-    if (compare(this.lastCellLoadedToCache, result.rawCells()[0]) < 0) {
-      // The first cell of this result is larger than the last cell of 
loadcache.
-      // If user do not allow partial result, it must be true.
-      return result;
-    }
-    if (compare(this.lastCellLoadedToCache, 
result.rawCells()[result.rawCells().length - 1]) >= 0) {
-      // The last cell of this result is smaller than the last cell of 
loadcache, skip all.
-      return null;
-    }
-
-    // The first one must not in filtered result, we start at the second.
-    int index = 1;
-    while (index < result.rawCells().length) {
-      if (compare(this.lastCellLoadedToCache, result.rawCells()[index]) < 0) {
-        break;
-      }
-      index++;
-    }
-    Cell[] list = Arrays.copyOfRange(result.rawCells(), index, 
result.rawCells().length);
-    return Result.create(list, result.getExists(), result.isStale(), 
result.mayHaveMoreCellsInRow());
-  }
-
   protected void initCache() {
     initSyncCache();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 2b75836..3e7cd00 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -316,6 +316,10 @@ public final class ConnectionUtils {
   }
 
   static Result filterCells(Result result, Cell keepCellsAfter) {
+    if (keepCellsAfter == null) {
+      // do not need to filter
+      return result;
+    }
     // not the same row
     if (!CellUtil.matchingRow(keepCellsAfter, result.getRow(), 0, 
result.getRow().length)) {
       return result;
@@ -410,4 +414,14 @@ public final class ConnectionUtils {
   public static int numberOfIndividualRows(List<Result> results) {
     return (int) results.stream().filter(r -> 
!r.mayHaveMoreCellsInRow()).count();
   }
+
+  public static ScanResultCache createScanResultCache(Scan scan) {
+    if (scan.getAllowPartialResults()) {
+      return new AllowPartialScanResultCache();
+    } else if (scan.getBatch() > 0) {
+      return new BatchScanResultCache(scan.getBatch());
+    } else {
+      return new CompleteScanResultCache();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
index 4752d70..f8682ec 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
@@ -24,7 +24,9 @@ import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -145,11 +147,11 @@ public class Result implements CellScannable, CellScanner 
{
   }
 
   public static Result create(List<Cell> cells, Boolean exists, boolean stale,
-      boolean hasMoreCellsInRow) {
+      boolean mayHaveMoreCellsInRow) {
     if (exists != null){
-      return new Result(null, exists, stale, hasMoreCellsInRow);
+      return new Result(null, exists, stale, mayHaveMoreCellsInRow);
     }
-    return new Result(cells.toArray(new Cell[cells.size()]), null, stale, 
hasMoreCellsInRow);
+    return new Result(cells.toArray(new Cell[cells.size()]), null, stale, 
mayHaveMoreCellsInRow);
   }
 
   /**
@@ -792,44 +794,42 @@ public class Result implements CellScannable, CellScanner 
{
    * @throws IOException A complete result cannot be formed because the 
results in the partial list
    *           come from different rows
    */
-  public static Result createCompleteResult(List<Result> partialResults)
+  public static Result createCompleteResult(Iterable<Result> partialResults)
       throws IOException {
+    if (partialResults == null) {
+      return Result.create(Collections.emptyList(), null, false);
+    }
     List<Cell> cells = new ArrayList<>();
     boolean stale = false;
     byte[] prevRow = null;
     byte[] currentRow = null;
-
-    if (partialResults != null && !partialResults.isEmpty()) {
-      for (int i = 0; i < partialResults.size(); i++) {
-        Result r = partialResults.get(i);
-        currentRow = r.getRow();
-        if (prevRow != null && !Bytes.equals(prevRow, currentRow)) {
-          throw new IOException(
-              "Cannot form complete result. Rows of partial results do not 
match." +
-                  " Partial Results: " + partialResults);
-        }
-
-        // Ensure that all Results except the last one are marked as partials. 
The last result
-        // may not be marked as a partial because Results are only marked as 
partials when
-        // the scan on the server side must be stopped due to reaching the 
maxResultSize.
-        // Visualizing it makes it easier to understand:
-        // maxResultSize: 2 cells
-        // (-x-) represents cell number x in a row
-        // Example: row1: -1- -2- -3- -4- -5- (5 cells total)
-        // How row1 will be returned by the server as partial Results:
-        // Result1: -1- -2- (2 cells, size limit reached, mark as partial)
-        // Result2: -3- -4- (2 cells, size limit reached, mark as partial)
-        // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial)
-        if (i != (partialResults.size() - 1) && !r.mayHaveMoreCellsInRow()) {
-          throw new IOException(
-              "Cannot form complete result. Result is missing partial flag. " +
-                  "Partial Results: " + partialResults);
-        }
-        prevRow = currentRow;
-        stale = stale || r.isStale();
-        for (Cell c : r.rawCells()) {
-          cells.add(c);
-        }
+    for (Iterator<Result> iter = partialResults.iterator(); iter.hasNext();) {
+      Result r = iter.next();
+      currentRow = r.getRow();
+      if (prevRow != null && !Bytes.equals(prevRow, currentRow)) {
+        throw new IOException(
+            "Cannot form complete result. Rows of partial results do not 
match." +
+                " Partial Results: " + partialResults);
+      }
+      // Ensure that all Results except the last one are marked as partials. 
The last result
+      // may not be marked as a partial because Results are only marked as 
partials when
+      // the scan on the server side must be stopped due to reaching the 
maxResultSize.
+      // Visualizing it makes it easier to understand:
+      // maxResultSize: 2 cells
+      // (-x-) represents cell number x in a row
+      // Example: row1: -1- -2- -3- -4- -5- (5 cells total)
+      // How row1 will be returned by the server as partial Results:
+      // Result1: -1- -2- (2 cells, size limit reached, mark as partial)
+      // Result2: -3- -4- (2 cells, size limit reached, mark as partial)
+      // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial)
+      if (iter.hasNext() && !r.mayHaveMoreCellsInRow()) {
+        throw new IOException("Cannot form complete result. Result is missing 
partial flag. " +
+            "Partial Results: " + partialResults);
+      }
+      prevRow = currentRow;
+      stale = stale || r.isStale();
+      for (Cell c : r.rawCells()) {
+        cells.add(c);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
index fc5ba14..3fe43a5 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
@@ -17,14 +17,14 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.junit.Assert.*;
+import static 
org.apache.hadoop.hbase.client.TestBatchScanResultCache.createCells;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.stream.IntStream;
 
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -51,10 +51,6 @@ public class TestAllowPartialScanResultCache {
     resultCache = null;
   }
 
-  private static Cell createCell(int key, int cq) {
-    return new KeyValue(Bytes.toBytes(key), CF, Bytes.toBytes("cq" + cq), 
Bytes.toBytes(key));
-  }
-
   @Test
   public void test() throws IOException {
     assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
@@ -62,31 +58,34 @@ public class TestAllowPartialScanResultCache {
     assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
       resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true));
 
-    Cell[] cells1 = IntStream.range(0, 10).mapToObj(i -> createCell(1, 
i)).toArray(Cell[]::new);
-    Cell[] cells2 = IntStream.range(0, 10).mapToObj(i -> createCell(2, 
i)).toArray(Cell[]::new);
+    Cell[] cells1 = createCells(CF, 1, 10);
+    Cell[] cells2 = createCells(CF, 2, 10);
 
     Result[] results1 = resultCache.addAndGet(
       new Result[] { Result.create(Arrays.copyOf(cells1, 5), null, false, 
true) }, false);
     assertEquals(1, results1.length);
     assertEquals(1, Bytes.toInt(results1[0].getRow()));
     assertEquals(5, results1[0].rawCells().length);
-    IntStream.range(0, 5).forEach(
-      i -> assertEquals(1, Bytes.toInt(results1[0].getValue(CF, 
Bytes.toBytes("cq" + i)))));
+    for (int i = 0; i < 5; i++) {
+      assertEquals(1, Bytes.toInt(results1[0].getValue(CF, Bytes.toBytes("cq" 
+ i))));
+    }
 
     Result[] results2 = resultCache.addAndGet(
       new Result[] { Result.create(Arrays.copyOfRange(cells1, 1, 10), null, 
false, true) }, false);
     assertEquals(1, results2.length);
     assertEquals(1, Bytes.toInt(results2[0].getRow()));
     assertEquals(5, results2[0].rawCells().length);
-    IntStream.range(5, 10).forEach(
-      i -> assertEquals(1, Bytes.toInt(results2[0].getValue(CF, 
Bytes.toBytes("cq" + i)))));
+    for (int i = 5; i < 10; i++) {
+      assertEquals(1, Bytes.toInt(results2[0].getValue(CF, Bytes.toBytes("cq" 
+ i))));
+    }
 
-    Result[] results3 = resultCache
-        .addAndGet(new Result[] { Result.create(cells1), Result.create(cells2) 
}, false);
+    Result[] results3 =
+        resultCache.addAndGet(new Result[] { Result.create(cells1), 
Result.create(cells2) }, false);
     assertEquals(1, results3.length);
     assertEquals(2, Bytes.toInt(results3[0].getRow()));
     assertEquals(10, results3[0].rawCells().length);
-    IntStream.range(0, 10).forEach(
-      i -> assertEquals(2, Bytes.toInt(results3[0].getValue(CF, 
Bytes.toBytes("cq" + i)))));
+    for (int i = 0; i < 10; i++) {
+      assertEquals(2, Bytes.toInt(results3[0].getValue(CF, Bytes.toBytes("cq" 
+ i))));
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java
new file mode 100644
index 0000000..31a4594
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ SmallTests.class, ClientTests.class })
+public class TestBatchScanResultCache {
+
+  private static byte[] CF = Bytes.toBytes("cf");
+
+  private BatchScanResultCache resultCache;
+
+  @Before
+  public void setUp() {
+    resultCache = new BatchScanResultCache(4);
+  }
+
+  @After
+  public void tearDown() {
+    resultCache.clear();
+    resultCache = null;
+  }
+
+  static Cell createCell(byte[] cf, int key, int cq) {
+    return new KeyValue(Bytes.toBytes(key), cf, Bytes.toBytes("cq" + cq), 
Bytes.toBytes(key));
+  }
+
+  static Cell[] createCells(byte[] cf, int key, int numCqs) {
+    Cell[] cells = new Cell[numCqs];
+    for (int i = 0; i < numCqs; i++) {
+      cells[i] = createCell(cf, key, i);
+    }
+    return cells;
+  }
+
+  private void assertResultEquals(Result result, int key, int start, int to) {
+    assertEquals(to - start, result.size());
+    for (int i = start; i < to; i++) {
+      assertEquals(key, Bytes.toInt(result.getValue(CF, Bytes.toBytes("cq" + 
i))));
+    }
+    assertEquals(to - start == 4, result.mayHaveMoreCellsInRow());
+  }
+
+  @Test
+  public void test() throws IOException {
+    assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
+      resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false));
+    assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
+      resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true));
+
+    Cell[] cells1 = createCells(CF, 1, 10);
+    Cell[] cells2 = createCells(CF, 2, 10);
+    Cell[] cells3 = createCells(CF, 3, 10);
+    assertEquals(0, resultCache.addAndGet(
+      new Result[] { Result.create(Arrays.copyOf(cells1, 3), null, false, 
true) }, false).length);
+    Result[] results = resultCache.addAndGet(
+      new Result[] { Result.create(Arrays.copyOfRange(cells1, 3, 7), null, 
false, true),
+          Result.create(Arrays.copyOfRange(cells1, 7, 10), null, false, true) 
},
+      false);
+    assertEquals(2, results.length);
+    assertResultEquals(results[0], 1, 0, 4);
+    assertResultEquals(results[1], 1, 4, 8);
+    results = resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false);
+    assertEquals(1, results.length);
+    assertResultEquals(results[0], 1, 8, 10);
+
+    results = resultCache.addAndGet(
+      new Result[] { Result.create(Arrays.copyOfRange(cells2, 0, 4), null, 
false, true),
+          Result.create(Arrays.copyOfRange(cells2, 4, 8), null, false, true),
+          Result.create(Arrays.copyOfRange(cells2, 8, 10), null, false, true),
+          Result.create(Arrays.copyOfRange(cells3, 0, 4), null, false, true),
+          Result.create(Arrays.copyOfRange(cells3, 4, 8), null, false, true),
+          Result.create(Arrays.copyOfRange(cells3, 8, 10), null, false, false) 
},
+      false);
+    assertEquals(6, results.length);
+    assertResultEquals(results[0], 2, 0, 4);
+    assertResultEquals(results[1], 2, 4, 8);
+    assertResultEquals(results[2], 2, 8, 10);
+    assertResultEquals(results[3], 3, 0, 4);
+    assertResultEquals(results[4], 3, 4, 8);
+    assertResultEquals(results[5], 3, 8, 10);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java
index a340e9f..8759593 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertSame;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.stream.IntStream;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
@@ -70,9 +69,9 @@ public class TestCompleteResultScanResultCache {
       resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true));
     int count = 10;
     Result[] results = new Result[count];
-    IntStream.range(0, count).forEach(i -> {
+    for (int i = 0; i < count; i++) {
       results[i] = Result.create(Arrays.asList(createCell(i, CQ1)));
-    });
+    }
     assertSame(results, resultCache.addAndGet(results, false));
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTablePartialScan.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTablePartialScan.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTablePartialScan.java
new file mode 100644
index 0000000..2a32206
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTablePartialScan.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestRawAsyncTablePartialScan {
+
+  private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("async");
+
+  private static byte[] FAMILY = Bytes.toBytes("cf");
+
+  private static byte[][] CQS =
+      new byte[][] { Bytes.toBytes("cq1"), Bytes.toBytes("cq2"), 
Bytes.toBytes("cq3") };
+
+  private static int COUNT = 100;
+
+  private static AsyncConnection CONN;
+
+  private static RawAsyncTable TABLE;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+    CONN = 
ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+    TABLE = CONN.getRawTable(TABLE_NAME);
+    TABLE
+        .putAll(IntStream.range(0, COUNT)
+            .mapToObj(i -> new Put(Bytes.toBytes(String.format("%02d", i)))
+                .addColumn(FAMILY, CQS[0], Bytes.toBytes(i))
+                .addColumn(FAMILY, CQS[1], Bytes.toBytes(2 * i))
+                .addColumn(FAMILY, CQS[2], Bytes.toBytes(3 * i)))
+            .collect(Collectors.toList()))
+        .get();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    CONN.close();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testBatchDoNotAllowPartial() throws InterruptedException, 
ExecutionException {
+    // we set batch to 2 and max result size to 1, then server will only 
returns one result per call
+    // but we should get 2 + 1 for every row.
+    List<Result> results = TABLE.scanAll(new 
Scan().setBatch(2).setMaxResultSize(1)).get();
+    assertEquals(2 * COUNT, results.size());
+    for (int i = 0; i < COUNT; i++) {
+      Result firstTwo = results.get(2 * i);
+      assertEquals(String.format("%02d", i), 
Bytes.toString(firstTwo.getRow()));
+      assertEquals(2, firstTwo.size());
+      assertEquals(i, Bytes.toInt(firstTwo.getValue(FAMILY, CQS[0])));
+      assertEquals(2 * i, Bytes.toInt(firstTwo.getValue(FAMILY, CQS[1])));
+
+      Result secondOne = results.get(2 * i + 1);
+      assertEquals(String.format("%02d", i), 
Bytes.toString(secondOne.getRow()));
+      assertEquals(1, secondOne.size());
+      assertEquals(3 * i, Bytes.toInt(secondOne.getValue(FAMILY, CQS[2])));
+    }
+  }
+
+  @Test
+  public void testReversedBatchDoNotAllowPartial() throws 
InterruptedException, ExecutionException {
+    // we set batch to 2 and max result size to 1, then server will only 
returns one result per call
+    // but we should get 2 + 1 for every row.
+    List<Result> results =
+        TABLE.scanAll(new 
Scan().setBatch(2).setMaxResultSize(1).setReversed(true)).get();
+    assertEquals(2 * COUNT, results.size());
+    for (int i = 0; i < COUNT; i++) {
+      int row = COUNT - i - 1;
+      Result firstTwo = results.get(2 * i);
+      assertEquals(String.format("%02d", row), 
Bytes.toString(firstTwo.getRow()));
+      assertEquals(2, firstTwo.size());
+      assertEquals(row, Bytes.toInt(firstTwo.getValue(FAMILY, CQS[0])));
+      assertEquals(2 * row, Bytes.toInt(firstTwo.getValue(FAMILY, CQS[1])));
+
+      Result secondOne = results.get(2 * i + 1);
+      assertEquals(String.format("%02d", row), 
Bytes.toString(secondOne.getRow()));
+      assertEquals(1, secondOne.size());
+      assertEquals(3 * row, Bytes.toInt(secondOne.getValue(FAMILY, CQS[2])));
+    }
+  }
+}

Reply via email to