HBASE-15576 Scanning cursor to prevent blocking long time on ResultScanner.next()
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2f1923a8 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2f1923a8 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2f1923a8 Branch: refs/heads/master Commit: 2f1923a8233b0c999494cd4b33f85b70dc5d7b12 Parents: 80e15aa Author: Phil Yang <yangzhe1...@apache.org> Authored: Thu May 25 15:18:58 2017 +0800 Committer: Phil Yang <yangzhe1...@apache.org> Committed: Wed Jun 7 11:32:04 2017 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/ClientScanner.java | 15 + .../apache/hadoop/hbase/client/ClientUtil.java | 4 + .../org/apache/hadoop/hbase/client/Cursor.java | 41 + .../org/apache/hadoop/hbase/client/Result.java | 45 + .../org/apache/hadoop/hbase/client/Scan.java | 43 + .../hadoop/hbase/client/ScannerCallable.java | 14 +- .../client/ScannerCallableWithReplicas.java | 4 + .../hbase/shaded/protobuf/ProtobufUtil.java | 24 + .../shaded/protobuf/generated/ClientProtos.java | 1172 +++++++++++++++--- .../src/main/protobuf/Client.proto | 14 +- .../hbase/protobuf/generated/ClientProtos.java | 1128 ++++++++++++++--- hbase-protocol/src/main/protobuf/Client.proto | 14 +- .../hbase/regionserver/RSRpcServices.java | 18 +- .../hbase/regionserver/ScannerContext.java | 13 + .../hadoop/hbase/regionserver/StoreScanner.java | 1 + .../hbase/regionserver/TestScannerCursor.java | 191 +++ 16 files changed, 2422 insertions(+), 319 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1923a8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index fa5f868..59cf005 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -499,6 +499,21 @@ public abstract class ClientScanner extends AbstractClientScanner { break; } } + if (cache.isEmpty() && !closed && scan.isNeedCursorResult()) { + if (callable.isHeartbeatMessage() && callable.getCursor() != null) { + // Use cursor row key from server + cache.add(Result.createCursorResult(callable.getCursor())); + break; + } + if (values.length > 0) { + // It is size limit exceed and we need return the last Result's row. + // When user setBatch and the scanner is reopened, the server may return Results that + // user has seen and the last Result can not be seen because the number is not enough. + // So the row keys of results may not be same, we must use the last one. + cache.add(Result.createCursorResult(new Cursor(values[values.length - 1].getRow()))); + break; + } + } if (countdown <= 0) { // we have enough result. closeScannerIfExhausted(regionExhausted); http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1923a8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientUtil.java index e4a84d5..a839080 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientUtil.java @@ -27,4 +27,8 @@ public class ClientUtil { public static boolean areScanStartRowAndStopRowEqual(byte[] startRow, byte[] stopRow) { return startRow != null && startRow.length > 0 && Bytes.equals(startRow, stopRow); } + + public static Cursor createCursor(byte[] row) { + return new Cursor(row); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1923a8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Cursor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Cursor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Cursor.java new file mode 100644 index 0000000..1d4b4b5 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Cursor.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Scan cursor to tell client where server is scanning + * {@link Scan#setNeedCursorResult(boolean)} + * {@link Result#isCursor()} + * {@link Result#getCursor()} + */ +@InterfaceAudience.Public +public class Cursor { + + private final byte[] row; + + Cursor(byte[] row) { + this.row = row; + } + + public byte[] getRow() { + return row; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1923a8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java index 63aab80..94e1b90 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java @@ -108,6 +108,8 @@ public class Result implements CellScannable, CellScanner { private final boolean readonly; + private Cursor cursor = null; + /** * Creates an empty Result w/ no KeyValue payload; returns null if you call {@link #rawCells()}. * Use this to represent no results if {@code null} won't do or in old 'mapred' as opposed @@ -173,6 +175,15 @@ public class Result implements CellScannable, CellScanner { return new Result(cells, null, stale, mayHaveMoreCellsInRow); } + public static Result createCursorResult(Cursor cursor) { + return new Result(cursor); + } + + private Result(Cursor cursor) { + this.cursor = cursor; + this.readonly = false; + } + /** Private ctor. Use {@link #create(Cell[])}. */ private Result(Cell[] cells, Boolean exists, boolean stale, boolean mayHaveMoreCellsInRow) { this.cells = cells; @@ -948,4 +959,38 @@ public class Result implements CellScannable, CellScanner { throw new UnsupportedOperationException("Attempting to modify readonly EMPTY_RESULT!"); } } + + /** + * Return true if this Result is a cursor to tell users where the server has scanned. + * In this Result the only meaningful method is {@link #getCursor()}. + * + * {@code + * while (r = scanner.next() && r != null) { + * if(r.isCursor()){ + * // scanning is not end, it is a cursor, save its row key and close scanner if you want, or + * // just continue the loop to call next(). + * } else { + * // just like before + * } + * } + * // scanning is end + * + * } + * {@link Scan#setNeedCursorResult(boolean)} + * {@link Cursor} + * {@link #getCursor()} + */ + public boolean isCursor() { + return cursor != null ; + } + + /** + * Return the cursor if this Result is a cursor result. + * {@link Scan#setNeedCursorResult(boolean)} + * {@link Cursor} + * {@link #isCursor()} + */ + public Cursor getCursor(){ + return cursor; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1923a8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index 2746263..639f43e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -185,6 +185,9 @@ public class Scan extends Query { * Control whether to use pread at server side. */ private ReadType readType = ReadType.DEFAULT; + + private boolean needCursorResult = false; + /** * Create a Scan operation across all rows. */ @@ -272,6 +275,7 @@ public class Scan extends Query { } this.mvccReadPoint = scan.getMvccReadPoint(); this.limit = scan.getLimit(); + this.needCursorResult = scan.isNeedCursorResult(); } /** @@ -1170,4 +1174,43 @@ public class Scan extends Query { Scan resetMvccReadPoint() { return setMvccReadPoint(-1L); } + + /** + * When the server is slow or we scan a table with many deleted data or we use a sparse filter, + * the server will response heartbeat to prevent timeout. However the scanner will return a Result + * only when client can do it. So if there are many heartbeats, the blocking time on + * ResultScanner#next() may be very long, which is not friendly to online services. + * + * Set this to true then you can get a special Result whose #isCursor() returns true and is not + * contains any real data. It only tells you where the server has scanned. You can call next + * to continue scanning or open a new scanner with this row key as start row whenever you want. + * + * Users can get a cursor when and only when there is a response from the server but we can not + * return a Result to users, for example, this response is a heartbeat or there are partial cells + * but users do not allow partial result. + * + * Now the cursor is in row level which means the special Result will only contains a row key. + * {@link Result#isCursor()} + * {@link Result#getCursor()} + * {@link Cursor} + */ + public Scan setNeedCursorResult(boolean needCursorResult) { + this.needCursorResult = needCursorResult; + return this; + } + + public boolean isNeedCursorResult() { + return needCursorResult; + } + + /** + * Create a new Scan with a cursor. It only set the position information like start row key. + * The others (like cfs, stop row, limit) should still be filled in by the user. + * {@link Result#isCursor()} + * {@link Result#getCursor()} + * {@link Cursor} + */ + public static Scan createScanFromCursor(Cursor cursor) { + return new Scan().withStartRow(cursor.getRow()); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1923a8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index ffac566..4227e41 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -87,6 +87,8 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> { */ protected boolean heartbeatMessage = false; + protected Cursor cursor; + // indicate if it is a remote server call protected boolean isRegionServerRemote = true; private long nextCallSeq = 0; @@ -148,7 +150,7 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> { checkIfRegionServerIsRemote(); instantiated = true; } - + cursor = null; // check how often we retry. if (reload) { incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); @@ -242,7 +244,11 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> { response = next(); } long timestamp = System.currentTimeMillis(); - setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage()); + boolean isHeartBeat = response.hasHeartbeatMessage() && response.getHeartbeatMessage(); + setHeartbeatMessage(isHeartBeat); + if (isHeartBeat && scan.isNeedCursorResult() && response.hasCursor()) { + cursor = ProtobufUtil.toCursor(response.getCursor()); + } Result[] rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response); if (logScannerActivity) { long now = System.currentTimeMillis(); @@ -288,6 +294,10 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> { return heartbeatMessage; } + public Cursor getCursor() { + return cursor; + } + private void setHeartbeatMessage(boolean heartbeatMessage) { this.heartbeatMessage = heartbeatMessage; } http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1923a8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index bcd5d21..0cdd4dd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -302,6 +302,10 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage(); } + public Cursor getCursor() { + return currentScannerCallable != null ? currentScannerCallable.getCursor() : null; + } + private void addCallsForCurrentReplica( ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) { RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1923a8/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 8a4e412..5c4dd55 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.ClientUtil; import org.apache.hadoop.hbase.client.CompactionState; import org.apache.hadoop.hbase.client.Consistency; +import org.apache.hadoop.hbase.client.Cursor; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; @@ -185,6 +186,7 @@ import org.apache.hadoop.hbase.util.Methods; import org.apache.hadoop.hbase.util.NonceKey; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.mapreduce.tools.CLI; /** * Protobufs utility. @@ -1099,6 +1101,9 @@ public final class ProtobufUtil { if (scan.getReadType() != Scan.ReadType.DEFAULT) { scanBuilder.setReadType(toReadType(scan.getReadType())); } + if (scan.isNeedCursorResult()) { + scanBuilder.setNeedCursorResult(true); + } return scanBuilder.build(); } @@ -1207,9 +1212,28 @@ public final class ProtobufUtil { } else if (proto.hasReadType()) { scan.setReadType(toReadType(proto.getReadType())); } + if (proto.getNeedCursorResult()) { + scan.setNeedCursorResult(true); + } return scan; } + public static ClientProtos.Cursor toCursor(Cursor cursor) { + ClientProtos.Cursor.Builder builder = ClientProtos.Cursor.newBuilder(); + ClientProtos.Cursor.newBuilder().setRow(ByteString.copyFrom(cursor.getRow())); + return builder.build(); + } + + public static ClientProtos.Cursor toCursor(Cell cell) { + return ClientProtos.Cursor.newBuilder() + .setRow(ByteString.copyFrom(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())) + .build(); + } + + public static Cursor toCursor(ClientProtos.Cursor cursor) { + return ClientUtil.createCursor(cursor.getRow().toByteArray()); + } + /** * Create a protocol buffer Get based on a client Get. *