HBASE-15576 Scanning cursor to prevent blocking long time on 
ResultScanner.next()


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2f1923a8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2f1923a8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2f1923a8

Branch: refs/heads/master
Commit: 2f1923a8233b0c999494cd4b33f85b70dc5d7b12
Parents: 80e15aa
Author: Phil Yang <yangzhe1...@apache.org>
Authored: Thu May 25 15:18:58 2017 +0800
Committer: Phil Yang <yangzhe1...@apache.org>
Committed: Wed Jun 7 11:32:04 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/ClientScanner.java      |   15 +
 .../apache/hadoop/hbase/client/ClientUtil.java  |    4 +
 .../org/apache/hadoop/hbase/client/Cursor.java  |   41 +
 .../org/apache/hadoop/hbase/client/Result.java  |   45 +
 .../org/apache/hadoop/hbase/client/Scan.java    |   43 +
 .../hadoop/hbase/client/ScannerCallable.java    |   14 +-
 .../client/ScannerCallableWithReplicas.java     |    4 +
 .../hbase/shaded/protobuf/ProtobufUtil.java     |   24 +
 .../shaded/protobuf/generated/ClientProtos.java | 1172 +++++++++++++++---
 .../src/main/protobuf/Client.proto              |   14 +-
 .../hbase/protobuf/generated/ClientProtos.java  | 1128 ++++++++++++++---
 hbase-protocol/src/main/protobuf/Client.proto   |   14 +-
 .../hbase/regionserver/RSRpcServices.java       |   18 +-
 .../hbase/regionserver/ScannerContext.java      |   13 +
 .../hadoop/hbase/regionserver/StoreScanner.java |    1 +
 .../hbase/regionserver/TestScannerCursor.java   |  191 +++
 16 files changed, 2422 insertions(+), 319 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1923a8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index fa5f868..59cf005 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -499,6 +499,21 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
           break;
         }
       }
+      if (cache.isEmpty() && !closed && scan.isNeedCursorResult()) {
+        if (callable.isHeartbeatMessage() && callable.getCursor() != null) {
+          // Use cursor row key from server
+          cache.add(Result.createCursorResult(callable.getCursor()));
+          break;
+        }
+        if (values.length > 0) {
+          // It is size limit exceed and we need return the last Result's row.
+          // When user setBatch and the scanner is reopened, the server may 
return Results that
+          // user has seen and the last Result can not be seen because the 
number is not enough.
+          // So the row keys of results may not be same, we must use the last 
one.
+          cache.add(Result.createCursorResult(new Cursor(values[values.length 
- 1].getRow())));
+          break;
+        }
+      }
       if (countdown <= 0) {
         // we have enough result.
         closeScannerIfExhausted(regionExhausted);

http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1923a8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientUtil.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientUtil.java
index e4a84d5..a839080 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientUtil.java
@@ -27,4 +27,8 @@ public class ClientUtil {
   public static boolean areScanStartRowAndStopRowEqual(byte[] startRow, byte[] 
stopRow) {
     return startRow != null && startRow.length > 0 && Bytes.equals(startRow, 
stopRow);
   }
+
+  public static Cursor createCursor(byte[] row) {
+    return new Cursor(row);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1923a8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Cursor.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Cursor.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Cursor.java
new file mode 100644
index 0000000..1d4b4b5
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Cursor.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Scan cursor to tell client where server is scanning
+ * {@link Scan#setNeedCursorResult(boolean)}
+ * {@link Result#isCursor()}
+ * {@link Result#getCursor()}
+ */
+@InterfaceAudience.Public
+public class Cursor {
+
+  private final byte[] row;
+
+  Cursor(byte[] row) {
+    this.row = row;
+  }
+
+  public byte[] getRow() {
+    return row;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1923a8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
index 63aab80..94e1b90 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
@@ -108,6 +108,8 @@ public class Result implements CellScannable, CellScanner {
 
   private final boolean readonly;
 
+  private Cursor cursor = null;
+
   /**
    * Creates an empty Result w/ no KeyValue payload; returns null if you call 
{@link #rawCells()}.
    * Use this to represent no results if {@code null} won't do or in old 
'mapred' as opposed
@@ -173,6 +175,15 @@ public class Result implements CellScannable, CellScanner {
     return new Result(cells, null, stale, mayHaveMoreCellsInRow);
   }
 
+  public static Result createCursorResult(Cursor cursor) {
+    return new Result(cursor);
+  }
+
+  private Result(Cursor cursor) {
+    this.cursor = cursor;
+    this.readonly = false;
+  }
+
   /** Private ctor. Use {@link #create(Cell[])}. */
   private Result(Cell[] cells, Boolean exists, boolean stale, boolean 
mayHaveMoreCellsInRow) {
     this.cells = cells;
@@ -948,4 +959,38 @@ public class Result implements CellScannable, CellScanner {
       throw new UnsupportedOperationException("Attempting to modify readonly 
EMPTY_RESULT!");
     }
   }
+
+  /**
+   * Return true if this Result is a cursor to tell users where the server has 
scanned.
+   * In this Result the only meaningful method is {@link #getCursor()}.
+   *
+   * {@code
+   *  while (r = scanner.next() && r != null) {
+   *    if(r.isCursor()){
+   *    // scanning is not end, it is a cursor, save its row key and close 
scanner if you want, or
+   *    // just continue the loop to call next().
+   *    } else {
+   *    // just like before
+   *    }
+   *  }
+   *  // scanning is end
+   *
+   * }
+   * {@link Scan#setNeedCursorResult(boolean)}
+   * {@link Cursor}
+   * {@link #getCursor()}
+   */
+  public boolean isCursor() {
+    return cursor != null ;
+  }
+
+  /**
+   * Return the cursor if this Result is a cursor result.
+   * {@link Scan#setNeedCursorResult(boolean)}
+   * {@link Cursor}
+   * {@link #isCursor()}
+   */
+  public Cursor getCursor(){
+    return cursor;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1923a8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index 2746263..639f43e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -185,6 +185,9 @@ public class Scan extends Query {
    * Control whether to use pread at server side.
    */
   private ReadType readType = ReadType.DEFAULT;
+
+  private boolean needCursorResult = false;
+
   /**
    * Create a Scan operation across all rows.
    */
@@ -272,6 +275,7 @@ public class Scan extends Query {
     }
     this.mvccReadPoint = scan.getMvccReadPoint();
     this.limit = scan.getLimit();
+    this.needCursorResult = scan.isNeedCursorResult();
   }
 
   /**
@@ -1170,4 +1174,43 @@ public class Scan extends Query {
   Scan resetMvccReadPoint() {
     return setMvccReadPoint(-1L);
   }
+
+  /**
+   * When the server is slow or we scan a table with many deleted data or we 
use a sparse filter,
+   * the server will response heartbeat to prevent timeout. However the 
scanner will return a Result
+   * only when client can do it. So if there are many heartbeats, the blocking 
time on
+   * ResultScanner#next() may be very long, which is not friendly to online 
services.
+   *
+   * Set this to true then you can get a special Result whose #isCursor() 
returns true and is not
+   * contains any real data. It only tells you where the server has scanned. 
You can call next
+   * to continue scanning or open a new scanner with this row key as start row 
whenever you want.
+   *
+   * Users can get a cursor when and only when there is a response from the 
server but we can not
+   * return a Result to users, for example, this response is a heartbeat or 
there are partial cells
+   * but users do not allow partial result.
+   *
+   * Now the cursor is in row level which means the special Result will only 
contains a row key.
+   * {@link Result#isCursor()}
+   * {@link Result#getCursor()}
+   * {@link Cursor}
+   */
+  public Scan setNeedCursorResult(boolean needCursorResult) {
+    this.needCursorResult = needCursorResult;
+    return this;
+  }
+
+  public boolean isNeedCursorResult() {
+    return needCursorResult;
+  }
+
+  /**
+   * Create a new Scan with a cursor. It only set the position information 
like start row key.
+   * The others (like cfs, stop row, limit) should still be filled in by the 
user.
+   * {@link Result#isCursor()}
+   * {@link Result#getCursor()}
+   * {@link Cursor}
+   */
+  public static Scan createScanFromCursor(Cursor cursor) {
+    return new Scan().withStartRow(cursor.getRow());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1923a8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index ffac566..4227e41 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -87,6 +87,8 @@ public class ScannerCallable extends 
ClientServiceCallable<Result[]> {
    */
   protected boolean heartbeatMessage = false;
 
+  protected Cursor cursor;
+
   // indicate if it is a remote server call
   protected boolean isRegionServerRemote = true;
   private long nextCallSeq = 0;
@@ -148,7 +150,7 @@ public class ScannerCallable extends 
ClientServiceCallable<Result[]> {
       checkIfRegionServerIsRemote();
       instantiated = true;
     }
-
+    cursor = null;
     // check how often we retry.
     if (reload) {
       incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
@@ -242,7 +244,11 @@ public class ScannerCallable extends 
ClientServiceCallable<Result[]> {
       response = next();
     }
     long timestamp = System.currentTimeMillis();
-    setHeartbeatMessage(response.hasHeartbeatMessage() && 
response.getHeartbeatMessage());
+    boolean isHeartBeat = response.hasHeartbeatMessage() && 
response.getHeartbeatMessage();
+    setHeartbeatMessage(isHeartBeat);
+    if (isHeartBeat && scan.isNeedCursorResult() && response.hasCursor()) {
+      cursor = ProtobufUtil.toCursor(response.getCursor());
+    }
     Result[] rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), 
response);
     if (logScannerActivity) {
       long now = System.currentTimeMillis();
@@ -288,6 +294,10 @@ public class ScannerCallable extends 
ClientServiceCallable<Result[]> {
     return heartbeatMessage;
   }
 
+  public Cursor getCursor() {
+    return cursor;
+  }
+
   private void setHeartbeatMessage(boolean heartbeatMessage) {
     this.heartbeatMessage = heartbeatMessage;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1923a8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index bcd5d21..0cdd4dd 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -302,6 +302,10 @@ class ScannerCallableWithReplicas implements 
RetryingCallable<Result[]> {
     return currentScannerCallable != null && 
currentScannerCallable.isHeartbeatMessage();
   }
 
+  public Cursor getCursor() {
+    return currentScannerCallable != null ? currentScannerCallable.getCursor() 
: null;
+  }
+
   private void addCallsForCurrentReplica(
       ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, 
RegionLocations rl) {
     RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);

http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1923a8/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 8a4e412..5c4dd55 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.ClientUtil;
 import org.apache.hadoop.hbase.client.CompactionState;
 import org.apache.hadoop.hbase.client.Consistency;
+import org.apache.hadoop.hbase.client.Cursor;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
@@ -185,6 +186,7 @@ import org.apache.hadoop.hbase.util.Methods;
 import org.apache.hadoop.hbase.util.NonceKey;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.mapreduce.tools.CLI;
 
 /**
  * Protobufs utility.
@@ -1099,6 +1101,9 @@ public final class ProtobufUtil {
     if (scan.getReadType() != Scan.ReadType.DEFAULT) {
       scanBuilder.setReadType(toReadType(scan.getReadType()));
     }
+    if (scan.isNeedCursorResult()) {
+      scanBuilder.setNeedCursorResult(true);
+    }
     return scanBuilder.build();
   }
 
@@ -1207,9 +1212,28 @@ public final class ProtobufUtil {
     } else if (proto.hasReadType()) {
       scan.setReadType(toReadType(proto.getReadType()));
     }
+    if (proto.getNeedCursorResult()) {
+      scan.setNeedCursorResult(true);
+    }
     return scan;
   }
 
+  public static ClientProtos.Cursor toCursor(Cursor cursor) {
+    ClientProtos.Cursor.Builder builder = ClientProtos.Cursor.newBuilder();
+    
ClientProtos.Cursor.newBuilder().setRow(ByteString.copyFrom(cursor.getRow()));
+    return builder.build();
+  }
+
+  public static ClientProtos.Cursor toCursor(Cell cell) {
+    return ClientProtos.Cursor.newBuilder()
+        .setRow(ByteString.copyFrom(cell.getRowArray(), cell.getRowOffset(), 
cell.getRowLength()))
+        .build();
+  }
+
+  public static Cursor toCursor(ClientProtos.Cursor cursor) {
+    return ClientUtil.createCursor(cursor.getRow().toByteArray());
+  }
+
   /**
    * Create a protocol buffer Get based on a client Get.
    *

Reply via email to