HBASE-17167 Pass mvcc to client when scan
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/890fcbd0 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/890fcbd0 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/890fcbd0 Branch: refs/heads/master Commit: 890fcbd0e6f916cc94b45b881b0cc060cc1e835c Parents: 7c43a23 Author: zhangduo <zhang...@apache.org> Authored: Tue Nov 29 17:13:49 2016 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Wed Nov 30 10:11:04 2016 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/ClientScanner.java | 406 +++++++++--------- .../org/apache/hadoop/hbase/client/HTable.java | 7 +- .../client/PackagePrivateFieldAccessor.java | 41 ++ .../org/apache/hadoop/hbase/client/Scan.java | 55 ++- .../hadoop/hbase/client/ScannerCallable.java | 3 + .../hadoop/hbase/protobuf/ProtobufUtil.java | 8 + .../hbase/shaded/protobuf/ProtobufUtil.java | 8 + .../shaded/protobuf/generated/ClientProtos.java | 412 +++++++++++++----- .../src/main/protobuf/Client.proto | 12 +- .../hbase/protobuf/generated/ClientProtos.java | 416 ++++++++++++++----- hbase-protocol/src/main/protobuf/Client.proto | 12 +- .../hadoop/hbase/regionserver/HRegion.java | 9 +- .../hbase/regionserver/RSRpcServices.java | 4 +- .../hbase/TestPartialResultsFromClientSide.java | 13 +- .../hbase/client/TestMvccConsistentScanner.java | 134 ++++++ .../hadoop/hbase/regionserver/TestTags.java | 14 +- .../regionserver/TestReplicationSink.java | 22 +- 17 files changed, 1120 insertions(+), 456 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/890fcbd0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 20ed183..c4c86a6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -120,198 +120,192 @@ public abstract class ClientScanner extends AbstractClientScanner { ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Scan table=" + tableName - + ", startRow=" + Bytes.toStringBinary(scan.getStartRow())); - } - this.scan = scan; - this.tableName = tableName; - this.lastNext = System.currentTimeMillis(); - this.connection = connection; - this.pool = pool; - this.primaryOperationTimeout = primaryOperationTimeout; - this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - if (scan.getMaxResultSize() > 0) { - this.maxScannerResultSize = scan.getMaxResultSize(); - } else { - this.maxScannerResultSize = conf.getLong( - HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); - } - this.scannerTimeout = HBaseConfiguration.getInt(conf, - HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, - HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); - - // check if application wants to collect scan metrics - initScanMetrics(scan); - - // Use the caching from the Scan. If not set, use the default cache setting for this table. - if (this.scan.getCaching() > 0) { - this.caching = this.scan.getCaching(); - } else { - this.caching = conf.getInt( - HConstants.HBASE_CLIENT_SCANNER_CACHING, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); - } - - this.caller = rpcFactory.<Result[]> newCaller(); - this.rpcControllerFactory = controllerFactory; - - this.conf = conf; - initCache(); - initializeScannerInConstruction(); - } + if (LOG.isTraceEnabled()) { + LOG.trace( + "Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow())); + } + this.scan = scan; + this.tableName = tableName; + this.lastNext = System.currentTimeMillis(); + this.connection = connection; + this.pool = pool; + this.primaryOperationTimeout = primaryOperationTimeout; + this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + if (scan.getMaxResultSize() > 0) { + this.maxScannerResultSize = scan.getMaxResultSize(); + } else { + this.maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + } + this.scannerTimeout = + HBaseConfiguration.getInt(conf, HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); + + // check if application wants to collect scan metrics + initScanMetrics(scan); + + // Use the caching from the Scan. If not set, use the default cache setting for this table. + if (this.scan.getCaching() > 0) { + this.caching = this.scan.getCaching(); + } else { + this.caching = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); + } + + this.caller = rpcFactory.<Result[]> newCaller(); + this.rpcControllerFactory = controllerFactory; + + this.conf = conf; + initCache(); + initializeScannerInConstruction(); + } - protected abstract void initCache(); + protected abstract void initCache(); - protected void initializeScannerInConstruction() throws IOException{ - // initialize the scanner - nextScanner(this.caching, false); - } + protected void initializeScannerInConstruction() throws IOException { + // initialize the scanner + nextScanner(this.caching, false); + } - protected ClusterConnection getConnection() { - return this.connection; - } + protected ClusterConnection getConnection() { + return this.connection; + } - protected TableName getTable() { - return this.tableName; - } + protected TableName getTable() { + return this.tableName; + } - protected int getRetries() { - return this.retries; - } + protected int getRetries() { + return this.retries; + } - protected int getScannerTimeout() { - return this.scannerTimeout; - } + protected int getScannerTimeout() { + return this.scannerTimeout; + } - protected Configuration getConf() { - return this.conf; - } + protected Configuration getConf() { + return this.conf; + } - protected Scan getScan() { - return scan; - } + protected Scan getScan() { + return scan; + } - protected ExecutorService getPool() { - return pool; - } + protected ExecutorService getPool() { + return pool; + } - protected int getPrimaryOperationTimeout() { - return primaryOperationTimeout; - } + protected int getPrimaryOperationTimeout() { + return primaryOperationTimeout; + } - protected int getCaching() { - return caching; - } + protected int getCaching() { + return caching; + } - protected long getTimestamp() { - return lastNext; - } + protected long getTimestamp() { + return lastNext; + } - @VisibleForTesting - protected long getMaxResultSize() { - return maxScannerResultSize; - } + @VisibleForTesting + protected long getMaxResultSize() { + return maxScannerResultSize; + } - // returns true if the passed region endKey - protected boolean checkScanStopRow(final byte [] endKey) { - if (this.scan.getStopRow().length > 0) { - // there is a stop row, check to see if we are past it. - byte [] stopRow = scan.getStopRow(); - int cmp = Bytes.compareTo(stopRow, 0, stopRow.length, - endKey, 0, endKey.length); - if (cmp <= 0) { - // stopRow <= endKey (endKey is equals to or larger than stopRow) - // This is a stop. - return true; - } + // returns true if the passed region endKey + protected boolean checkScanStopRow(final byte[] endKey) { + if (this.scan.getStopRow().length > 0) { + // there is a stop row, check to see if we are past it. + byte[] stopRow = scan.getStopRow(); + int cmp = Bytes.compareTo(stopRow, 0, stopRow.length, endKey, 0, endKey.length); + if (cmp <= 0) { + // stopRow <= endKey (endKey is equals to or larger than stopRow) + // This is a stop. + return true; } - return false; //unlikely. - } - - private boolean possiblyNextScanner(int nbRows, final boolean done) throws IOException { - // If we have just switched replica, don't go to the next scanner yet. Rather, try - // the scanner operations on the new replica, from the right point in the scan - // Note that when we switched to a different replica we left it at a point - // where we just did the "openScanner" with the appropriate startrow - if (callable != null && callable.switchedToADifferentReplica()) return true; - return nextScanner(nbRows, done); } + return false; // unlikely. + } - /* - * Gets a scanner for the next region. If this.currentRegion != null, then - * we will move to the endrow of this.currentRegion. Else we will get - * scanner at the scan.getStartRow(). We will go no further, just tidy - * up outstanding scanners, if <code>currentRegion != null</code> and - * <code>done</code> is true. - * @param nbRows - * @param done Server-side says we're done scanning. - */ - protected boolean nextScanner(int nbRows, final boolean done) - throws IOException { - // Close the previous scanner if it's open - if (this.callable != null) { - this.callable.setClose(); - call(callable, caller, scannerTimeout); - this.callable = null; - } + private boolean possiblyNextScanner(int nbRows, final boolean done) throws IOException { + // If we have just switched replica, don't go to the next scanner yet. Rather, try + // the scanner operations on the new replica, from the right point in the scan + // Note that when we switched to a different replica we left it at a point + // where we just did the "openScanner" with the appropriate startrow + if (callable != null && callable.switchedToADifferentReplica()) return true; + return nextScanner(nbRows, done); + } - // Where to start the next scanner - byte [] localStartKey; - - // if we're at end of table, close and return false to stop iterating - if (this.currentRegion != null) { - byte [] endKey = this.currentRegion.getEndKey(); - if (endKey == null || - Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) || - checkScanStopRow(endKey) || - done) { - close(); - if (LOG.isTraceEnabled()) { - LOG.trace("Finished " + this.currentRegion); - } - return false; - } - localStartKey = endKey; + /* + * Gets a scanner for the next region. If this.currentRegion != null, then we will move to the + * endrow of this.currentRegion. Else we will get scanner at the scan.getStartRow(). We will go no + * further, just tidy up outstanding scanners, if <code>currentRegion != null</code> and + * <code>done</code> is true. + * @param nbRows + * @param done Server-side says we're done scanning. + */ + protected boolean nextScanner(int nbRows, final boolean done) throws IOException { + // Close the previous scanner if it's open + if (this.callable != null) { + this.callable.setClose(); + call(callable, caller, scannerTimeout); + this.callable = null; + } + + // Where to start the next scanner + byte[] localStartKey; + + // if we're at end of table, close and return false to stop iterating + if (this.currentRegion != null) { + byte[] endKey = this.currentRegion.getEndKey(); + if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) + || checkScanStopRow(endKey) || done) { + close(); if (LOG.isTraceEnabled()) { LOG.trace("Finished " + this.currentRegion); } - } else { - localStartKey = this.scan.getStartRow(); + return false; } - - if (LOG.isDebugEnabled() && this.currentRegion != null) { - // Only worth logging if NOT first region in scan. - LOG.debug("Advancing internal scanner to startKey at '" + - Bytes.toStringBinary(localStartKey) + "'"); + localStartKey = endKey; + // clear mvcc read point if we are going to switch regions + scan.resetMvccReadPoint(); + if (LOG.isTraceEnabled()) { + LOG.trace("Finished " + this.currentRegion); } - try { - callable = getScannerCallable(localStartKey, nbRows); - // Open a scanner on the region server starting at the - // beginning of the region - call(callable, caller, scannerTimeout); - this.currentRegion = callable.getHRegionInfo(); - if (this.scanMetrics != null) { - this.scanMetrics.countOfRegions.incrementAndGet(); - } - } catch (IOException e) { - close(); - throw e; + } else { + localStartKey = this.scan.getStartRow(); + } + + if (LOG.isDebugEnabled() && this.currentRegion != null) { + // Only worth logging if NOT first region in scan. + LOG.debug( + "Advancing internal scanner to startKey at '" + Bytes.toStringBinary(localStartKey) + "'"); + } + try { + callable = getScannerCallable(localStartKey, nbRows); + // Open a scanner on the region server starting at the + // beginning of the region + call(callable, caller, scannerTimeout); + this.currentRegion = callable.getHRegionInfo(); + if (this.scanMetrics != null) { + this.scanMetrics.countOfRegions.incrementAndGet(); } - return true; + } catch (IOException e) { + close(); + throw e; } + return true; + } @VisibleForTesting boolean isAnyRPCcancelled() { return callable.isAnyRPCcancelled(); } - Result[] call(ScannerCallableWithReplicas callable, - RpcRetryingCaller<Result[]> caller, int scannerTimeout) - throws IOException, RuntimeException { + Result[] call(ScannerCallableWithReplicas callable, RpcRetryingCaller<Result[]> caller, + int scannerTimeout) throws IOException, RuntimeException { if (Thread.interrupted()) { throw new InterruptedIOException(); } @@ -320,61 +314,57 @@ public abstract class ClientScanner extends AbstractClientScanner { return caller.callWithoutRetries(callable, scannerTimeout); } - @InterfaceAudience.Private - protected ScannerCallableWithReplicas getScannerCallable(byte [] localStartKey, - int nbRows) { - scan.setStartRow(localStartKey); - ScannerCallable s = - new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics, - this.rpcControllerFactory); - s.setCaching(nbRows); - ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(), - s, pool, primaryOperationTimeout, scan, - retries, scannerTimeout, caching, conf, caller); - return sr; - } + @InterfaceAudience.Private + protected ScannerCallableWithReplicas getScannerCallable(byte[] localStartKey, int nbRows) { + scan.setStartRow(localStartKey); + ScannerCallable s = new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics, + this.rpcControllerFactory); + s.setCaching(nbRows); + ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(), s, + pool, primaryOperationTimeout, scan, retries, scannerTimeout, caching, conf, caller); + return sr; + } - /** - * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the - * application or TableInputFormat.Later, we could push it to other systems. We don't use - * metrics framework because it doesn't support multi-instances of the same metrics on the same - * machine; for scan/map reduce scenarios, we will have multiple scans running at the same time. - * - * By default, scan metrics are disabled; if the application wants to collect them, this - * behavior can be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)} - * - * <p>This invocation clears the scan metrics. Metrics are aggregated in the Scan instance. - */ - protected void writeScanMetrics() { - if (this.scanMetrics == null || scanMetricsPublished) { - return; - } - MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics); - scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray()); - scanMetricsPublished = true; + /** + * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the + * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics + * framework because it doesn't support multi-instances of the same metrics on the same machine; + * for scan/map reduce scenarios, we will have multiple scans running at the same time. By + * default, scan metrics are disabled; if the application wants to collect them, this behavior can + * be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)} + * <p> + * This invocation clears the scan metrics. Metrics are aggregated in the Scan instance. + */ + protected void writeScanMetrics() { + if (this.scanMetrics == null || scanMetricsPublished) { + return; } + MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics); + scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray()); + scanMetricsPublished = true; + } - protected void initSyncCache() { + protected void initSyncCache() { cache = new LinkedList<Result>(); } - protected Result nextWithSyncCache() throws IOException { - // If the scanner is closed and there's nothing left in the cache, next is a no-op. - if (cache.size() == 0 && this.closed) { - return null; - } - if (cache.size() == 0) { - loadCache(); - } - - if (cache.size() > 0) { - return cache.poll(); - } - - // if we exhausted this scanner before calling close, write out the scan metrics - writeScanMetrics(); + protected Result nextWithSyncCache() throws IOException { + // If the scanner is closed and there's nothing left in the cache, next is a no-op. + if (cache.size() == 0 && this.closed) { return null; } + if (cache.size() == 0) { + loadCache(); + } + + if (cache.size() > 0) { + return cache.poll(); + } + + // if we exhausted this scanner before calling close, write out the scan metrics + writeScanMetrics(); + return null; + } @VisibleForTesting public int getCacheSize() { http://git-wip-us.apache.org/repos/asf/hbase/blob/890fcbd0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index b2c012d..c56132c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -334,7 +334,7 @@ public class HTable implements Table { * {@link Table#getScanner(Scan)} has other usage details. */ @Override - public ResultScanner getScanner(final Scan scan) throws IOException { + public ResultScanner getScanner(Scan scan) throws IOException { if (scan.getBatch() > 0 && scan.isSmall()) { throw new IllegalArgumentException("Small scan should not be used with batching"); } @@ -345,7 +345,10 @@ public class HTable implements Table { if (scan.getMaxResultSize() <= 0) { scan.setMaxResultSize(scannerMaxResultSize); } - + if (scan.getMvccReadPoint() > 0) { + // it is not supposed to be set by user, clear + scan.resetMvccReadPoint(); + } Boolean async = scan.isAsyncPrefetch(); if (async == null) { async = connConfiguration.isClientScannerAsyncPrefetch(); http://git-wip-us.apache.org/repos/asf/hbase/blob/890fcbd0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PackagePrivateFieldAccessor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PackagePrivateFieldAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PackagePrivateFieldAccessor.java new file mode 100644 index 0000000..6a3ac18 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PackagePrivateFieldAccessor.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A helper class used to access the package private field in o.a.h.h.client package. + * <p> + * This is because we share some data structures between client and server and the data structures + * are marked as {@code InterfaceAudience.Public}, but we do not want to expose some of the fields + * to end user. + * <p> + * TODO: A better solution is to separate the data structures used in client and server. + */ +@InterfaceAudience.Private +public class PackagePrivateFieldAccessor { + + public static void setMvccReadPoint(Scan scan, long mvccReadPoint) { + scan.setMvccReadPoint(mvccReadPoint); + } + + public static long getMvccReadPoint(Scan scan) { + return scan.getMvccReadPoint(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/890fcbd0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index b0d361c..9d659b8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -154,27 +154,24 @@ public class Scan extends Query { */ public static final boolean DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH = false; - /** - * Set it true for small scan to get better performance - * - * Small scan should use pread and big scan can use seek + read - * - * seek + read is fast but can cause two problem (1) resource contention (2) - * cause too much network io - * - * [89-fb] Using pread for non-compaction read request - * https://issues.apache.org/jira/browse/HBASE-7266 - * - * On the other hand, if setting it true, we would do - * openScanner,next,closeScanner in one RPC call. It means the better - * performance for small scan. [HBASE-9488]. - * - * Generally, if the scan range is within one data block(64KB), it could be - * considered as a small scan. + /** + * Set it true for small scan to get better performance Small scan should use pread and big scan + * can use seek + read seek + read is fast but can cause two problem (1) resource contention (2) + * cause too much network io [89-fb] Using pread for non-compaction read request + * https://issues.apache.org/jira/browse/HBASE-7266 On the other hand, if setting it true, we + * would do openScanner,next,closeScanner in one RPC call. It means the better performance for + * small scan. [HBASE-9488]. Generally, if the scan range is within one data block(64KB), it could + * be considered as a small scan. */ private boolean small = false; /** + * The mvcc read point to use when open a scanner. Remember to clear it after switching regions as + * the mvcc is only valid within region scope. + */ + private long mvccReadPoint = -1L; + + /** * Create a Scan operation across all rows. */ public Scan() {} @@ -253,6 +250,7 @@ public class Scan extends Query { TimeRange tr = entry.getValue(); setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax()); } + this.mvccReadPoint = scan.getMvccReadPoint(); } /** @@ -281,6 +279,7 @@ public class Scan extends Query { TimeRange tr = entry.getValue(); setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax()); } + this.mvccReadPoint = -1L; } public boolean isGetScan() { @@ -976,4 +975,26 @@ public class Scan extends Query { this.asyncPrefetch = asyncPrefetch; return this; } + + /** + * Get the mvcc read point used to open a scanner. + */ + long getMvccReadPoint() { + return mvccReadPoint; + } + + /** + * Set the mvcc read point used to open a scanner. + */ + Scan setMvccReadPoint(long mvccReadPoint) { + this.mvccReadPoint = mvccReadPoint; + return this; + } + + /** + * Set the mvcc read point to -1 which means do not use it. + */ + Scan resetMvccReadPoint() { + return setMvccReadPoint(-1L); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/890fcbd0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 0351e54..7a22648 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -375,6 +375,9 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> { LOG.info("Open scanner=" + id + " for scan=" + scan.toString() + " on region " + getLocation().toString()); } + if (response.hasMvccReadPoint()) { + this.scan.setMvccReadPoint(response.getMvccReadPoint()); + } return id; } catch (Exception e) { throw ProtobufUtil.handleRemoteException(e); http://git-wip-us.apache.org/repos/asf/hbase/blob/890fcbd0/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 330348d..c52d413 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -907,6 +908,10 @@ public final class ProtobufUtil { if (scan.getCaching() > 0) { scanBuilder.setCaching(scan.getCaching()); } + long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan); + if (mvccReadPoint > 0) { + scanBuilder.setMvccReadPoint(mvccReadPoint); + } return scanBuilder.build(); } @@ -994,6 +999,9 @@ public final class ProtobufUtil { if (proto.hasCaching()) { scan.setCaching(proto.getCaching()); } + if (proto.hasMvccReadPoint()) { + PackagePrivateFieldAccessor.setMvccReadPoint(scan, proto.getMvccReadPoint()); + } return scan; } http://git-wip-us.apache.org/repos/asf/hbase/blob/890fcbd0/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 7d1770e..5876fae 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionLoadStats; import org.apache.hadoop.hbase.client.Result; @@ -1019,6 +1020,10 @@ public final class ProtobufUtil { if (scan.getCaching() > 0) { scanBuilder.setCaching(scan.getCaching()); } + long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan); + if (mvccReadPoint > 0) { + scanBuilder.setMvccReadPoint(mvccReadPoint); + } return scanBuilder.build(); } @@ -1106,6 +1111,9 @@ public final class ProtobufUtil { if (proto.hasCaching()) { scan.setCaching(proto.getCaching()); } + if (proto.hasMvccReadPoint()) { + PackagePrivateFieldAccessor.setMvccReadPoint(scan, proto.getMvccReadPoint()); + } return scan; } http://git-wip-us.apache.org/repos/asf/hbase/blob/890fcbd0/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java index bfd196e..e9458df 100644 --- a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java +++ b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java @@ -14554,6 +14554,15 @@ public final class ClientProtos { */ org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ColumnFamilyTimeRangeOrBuilder getCfTimeRangeOrBuilder( int index); + + /** + * <code>optional uint64 mvcc_read_point = 20 [default = 0];</code> + */ + boolean hasMvccReadPoint(); + /** + * <code>optional uint64 mvcc_read_point = 20 [default = 0];</code> + */ + long getMvccReadPoint(); } /** * <pre> @@ -14594,6 +14603,7 @@ public final class ClientProtos { caching_ = 0; allowPartialResults_ = false; cfTimeRange_ = java.util.Collections.emptyList(); + mvccReadPoint_ = 0L; } @java.lang.Override @@ -14753,6 +14763,11 @@ public final class ClientProtos { input.readMessage(org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ColumnFamilyTimeRange.PARSER, extensionRegistry)); break; } + case 160: { + bitField0_ |= 0x00010000; + mvccReadPoint_ = input.readUInt64(); + break; + } } } } catch (org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException e) { @@ -15153,6 +15168,21 @@ public final class ClientProtos { return cfTimeRange_.get(index); } + public static final int MVCC_READ_POINT_FIELD_NUMBER = 20; + private long mvccReadPoint_; + /** + * <code>optional uint64 mvcc_read_point = 20 [default = 0];</code> + */ + public boolean hasMvccReadPoint() { + return ((bitField0_ & 0x00010000) == 0x00010000); + } + /** + * <code>optional uint64 mvcc_read_point = 20 [default = 0];</code> + */ + public long getMvccReadPoint() { + return mvccReadPoint_; + } + private byte memoizedIsInitialized = -1; public final boolean isInitialized() { byte isInitialized = memoizedIsInitialized; @@ -15246,6 +15276,9 @@ public final class ClientProtos { for (int i = 0; i < cfTimeRange_.size(); i++) { output.writeMessage(19, cfTimeRange_.get(i)); } + if (((bitField0_ & 0x00010000) == 0x00010000)) { + output.writeUInt64(20, mvccReadPoint_); + } unknownFields.writeTo(output); } @@ -15330,6 +15363,10 @@ public final class ClientProtos { size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream .computeMessageSize(19, cfTimeRange_.get(i)); } + if (((bitField0_ & 0x00010000) == 0x00010000)) { + size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream + .computeUInt64Size(20, mvccReadPoint_); + } size += unknownFields.getSerializedSize(); memoizedSize = size; return size; @@ -15432,6 +15469,11 @@ public final class ClientProtos { } result = result && getCfTimeRangeList() .equals(other.getCfTimeRangeList()); + result = result && (hasMvccReadPoint() == other.hasMvccReadPoint()); + if (hasMvccReadPoint()) { + result = result && (getMvccReadPoint() + == other.getMvccReadPoint()); + } result = result && unknownFields.equals(other.unknownFields); return result; } @@ -15525,6 +15567,11 @@ public final class ClientProtos { hash = (37 * hash) + CF_TIME_RANGE_FIELD_NUMBER; hash = (53 * hash) + getCfTimeRangeList().hashCode(); } + if (hasMvccReadPoint()) { + hash = (37 * hash) + MVCC_READ_POINT_FIELD_NUMBER; + hash = (53 * hash) + org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.hashLong( + getMvccReadPoint()); + } hash = (29 * hash) + unknownFields.hashCode(); memoizedHashCode = hash; return hash; @@ -15716,6 +15763,8 @@ public final class ClientProtos { } else { cfTimeRangeBuilder_.clear(); } + mvccReadPoint_ = 0L; + bitField0_ = (bitField0_ & ~0x00080000); return this; } @@ -15839,6 +15888,10 @@ public final class ClientProtos { } else { result.cfTimeRange_ = cfTimeRangeBuilder_.build(); } + if (((from_bitField0_ & 0x00080000) == 0x00080000)) { + to_bitField0_ |= 0x00010000; + } + result.mvccReadPoint_ = mvccReadPoint_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -16007,6 +16060,9 @@ public final class ClientProtos { } } } + if (other.hasMvccReadPoint()) { + setMvccReadPoint(other.getMvccReadPoint()); + } this.mergeUnknownFields(other.unknownFields); onChanged(); return this; @@ -17484,6 +17540,38 @@ public final class ClientProtos { } return cfTimeRangeBuilder_; } + + private long mvccReadPoint_ ; + /** + * <code>optional uint64 mvcc_read_point = 20 [default = 0];</code> + */ + public boolean hasMvccReadPoint() { + return ((bitField0_ & 0x00080000) == 0x00080000); + } + /** + * <code>optional uint64 mvcc_read_point = 20 [default = 0];</code> + */ + public long getMvccReadPoint() { + return mvccReadPoint_; + } + /** + * <code>optional uint64 mvcc_read_point = 20 [default = 0];</code> + */ + public Builder setMvccReadPoint(long value) { + bitField0_ |= 0x00080000; + mvccReadPoint_ = value; + onChanged(); + return this; + } + /** + * <code>optional uint64 mvcc_read_point = 20 [default = 0];</code> + */ + public Builder clearMvccReadPoint() { + bitField0_ = (bitField0_ & ~0x00080000); + mvccReadPoint_ = 0L; + onChanged(); + return this; + } public final Builder setUnknownFields( final org.apache.hadoop.hbase.shaded.com.google.protobuf.UnknownFieldSet unknownFields) { return super.setUnknownFields(unknownFields); @@ -19311,6 +19399,27 @@ public final class ClientProtos { * <code>optional .hbase.pb.ScanMetrics scan_metrics = 10;</code> */ org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder getScanMetricsOrBuilder(); + + /** + * <pre> + * The mvcc read point which is used to open the scanner at server side. Client can + * make use of this mvcc_read_point when restarting a scanner to get a consistent view + * of a row. + * </pre> + * + * <code>optional uint64 mvcc_read_point = 11 [default = 0];</code> + */ + boolean hasMvccReadPoint(); + /** + * <pre> + * The mvcc read point which is used to open the scanner at server side. Client can + * make use of this mvcc_read_point when restarting a scanner to get a consistent view + * of a row. + * </pre> + * + * <code>optional uint64 mvcc_read_point = 11 [default = 0];</code> + */ + long getMvccReadPoint(); } /** * <pre> @@ -19339,6 +19448,7 @@ public final class ClientProtos { partialFlagPerResult_ = java.util.Collections.emptyList(); moreResultsInRegion_ = false; heartbeatMessage_ = false; + mvccReadPoint_ = 0L; } @java.lang.Override @@ -19463,6 +19573,11 @@ public final class ClientProtos { bitField0_ |= 0x00000040; break; } + case 88: { + bitField0_ |= 0x00000080; + mvccReadPoint_ = input.readUInt64(); + break; + } } } } catch (org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException e) { @@ -19821,6 +19936,33 @@ public final class ClientProtos { return scanMetrics_ == null ? org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance() : scanMetrics_; } + public static final int MVCC_READ_POINT_FIELD_NUMBER = 11; + private long mvccReadPoint_; + /** + * <pre> + * The mvcc read point which is used to open the scanner at server side. Client can + * make use of this mvcc_read_point when restarting a scanner to get a consistent view + * of a row. + * </pre> + * + * <code>optional uint64 mvcc_read_point = 11 [default = 0];</code> + */ + public boolean hasMvccReadPoint() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * <pre> + * The mvcc read point which is used to open the scanner at server side. Client can + * make use of this mvcc_read_point when restarting a scanner to get a consistent view + * of a row. + * </pre> + * + * <code>optional uint64 mvcc_read_point = 11 [default = 0];</code> + */ + public long getMvccReadPoint() { + return mvccReadPoint_; + } + private byte memoizedIsInitialized = -1; public final boolean isInitialized() { byte isInitialized = memoizedIsInitialized; @@ -19863,6 +20005,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000040) == 0x00000040)) { output.writeMessage(10, getScanMetrics()); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + output.writeUInt64(11, mvccReadPoint_); + } unknownFields.writeTo(output); } @@ -19918,6 +20063,10 @@ public final class ClientProtos { size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream .computeMessageSize(10, getScanMetrics()); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream + .computeUInt64Size(11, mvccReadPoint_); + } size += unknownFields.getSerializedSize(); memoizedSize = size; return size; @@ -19976,6 +20125,11 @@ public final class ClientProtos { result = result && getScanMetrics() .equals(other.getScanMetrics()); } + result = result && (hasMvccReadPoint() == other.hasMvccReadPoint()); + if (hasMvccReadPoint()) { + result = result && (getMvccReadPoint() + == other.getMvccReadPoint()); + } result = result && unknownFields.equals(other.unknownFields); return result; } @@ -20032,6 +20186,11 @@ public final class ClientProtos { hash = (37 * hash) + SCAN_METRICS_FIELD_NUMBER; hash = (53 * hash) + getScanMetrics().hashCode(); } + if (hasMvccReadPoint()) { + hash = (37 * hash) + MVCC_READ_POINT_FIELD_NUMBER; + hash = (53 * hash) + org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.hashLong( + getMvccReadPoint()); + } hash = (29 * hash) + unknownFields.hashCode(); memoizedHashCode = hash; return hash; @@ -20186,6 +20345,8 @@ public final class ClientProtos { scanMetricsBuilder_.clear(); } bitField0_ = (bitField0_ & ~0x00000200); + mvccReadPoint_ = 0L; + bitField0_ = (bitField0_ & ~0x00000400); return this; } @@ -20261,6 +20422,10 @@ public final class ClientProtos { } else { result.scanMetrics_ = scanMetricsBuilder_.build(); } + if (((from_bitField0_ & 0x00000400) == 0x00000400)) { + to_bitField0_ |= 0x00000080; + } + result.mvccReadPoint_ = mvccReadPoint_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -20370,6 +20535,9 @@ public final class ClientProtos { if (other.hasScanMetrics()) { mergeScanMetrics(other.getScanMetrics()); } + if (other.hasMvccReadPoint()) { + setMvccReadPoint(other.getMvccReadPoint()); + } this.mergeUnknownFields(other.unknownFields); onChanged(); return this; @@ -21433,6 +21601,62 @@ public final class ClientProtos { } return scanMetricsBuilder_; } + + private long mvccReadPoint_ ; + /** + * <pre> + * The mvcc read point which is used to open the scanner at server side. Client can + * make use of this mvcc_read_point when restarting a scanner to get a consistent view + * of a row. + * </pre> + * + * <code>optional uint64 mvcc_read_point = 11 [default = 0];</code> + */ + public boolean hasMvccReadPoint() { + return ((bitField0_ & 0x00000400) == 0x00000400); + } + /** + * <pre> + * The mvcc read point which is used to open the scanner at server side. Client can + * make use of this mvcc_read_point when restarting a scanner to get a consistent view + * of a row. + * </pre> + * + * <code>optional uint64 mvcc_read_point = 11 [default = 0];</code> + */ + public long getMvccReadPoint() { + return mvccReadPoint_; + } + /** + * <pre> + * The mvcc read point which is used to open the scanner at server side. Client can + * make use of this mvcc_read_point when restarting a scanner to get a consistent view + * of a row. + * </pre> + * + * <code>optional uint64 mvcc_read_point = 11 [default = 0];</code> + */ + public Builder setMvccReadPoint(long value) { + bitField0_ |= 0x00000400; + mvccReadPoint_ = value; + onChanged(); + return this; + } + /** + * <pre> + * The mvcc read point which is used to open the scanner at server side. Client can + * make use of this mvcc_read_point when restarting a scanner to get a consistent view + * of a row. + * </pre> + * + * <code>optional uint64 mvcc_read_point = 11 [default = 0];</code> + */ + public Builder clearMvccReadPoint() { + bitField0_ = (bitField0_ & ~0x00000400); + mvccReadPoint_ = 0L; + onChanged(); + return this; + } public final Builder setUnknownFields( final org.apache.hadoop.hbase.shaded.com.google.protobuf.UnknownFieldSet unknownFields) { return super.setUnknownFields(unknownFields); @@ -40434,7 +40658,7 @@ public final class ClientProtos { "tion\030\003 \001(\0132\023.hbase.pb.Condition\022\023\n\013nonce" + "_group\030\004 \001(\004\"E\n\016MutateResponse\022 \n\006result" + "\030\001 \001(\0132\020.hbase.pb.Result\022\021\n\tprocessed\030\002 " + - "\001(\010\"\275\004\n\004Scan\022 \n\006column\030\001 \003(\0132\020.hbase.pb." + + "\001(\010\"\331\004\n\004Scan\022 \n\006column\030\001 \003(\0132\020.hbase.pb." + "Column\022*\n\tattribute\030\002 \003(\0132\027.hbase.pb.Nam" + "eBytesPair\022\021\n\tstart_row\030\003 \001(\014\022\020\n\010stop_ro" + "w\030\004 \001(\014\022 \n\006filter\030\005 \001(\0132\020.hbase.pb.Filte" + @@ -40448,96 +40672,98 @@ public final class ClientProtos { " \001(\0162\025.hbase.pb.Consistency:\006STRONG\022\017\n\007c" + "aching\030\021 \001(\r\022\035\n\025allow_partial_results\030\022 " + "\001(\010\0226\n\rcf_time_range\030\023 \003(\0132\037.hbase.pb.Co" + - "lumnFamilyTimeRange\"\246\002\n\013ScanRequest\022)\n\006r", - "egion\030\001 \001(\0132\031.hbase.pb.RegionSpecifier\022\034" + - "\n\004scan\030\002 \001(\0132\016.hbase.pb.Scan\022\022\n\nscanner_" + - "id\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rclos" + - "e_scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(\004\022\037" + - "\n\027client_handles_partials\030\007 \001(\010\022!\n\031clien" + - "t_handles_heartbeats\030\010 \001(\010\022\032\n\022track_scan" + - "_metrics\030\t \001(\010\022\024\n\005renew\030\n \001(\010:\005false\"\232\002\n" + - "\014ScanResponse\022\030\n\020cells_per_result\030\001 \003(\r\022" + - "\022\n\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(" + - "\010\022\013\n\003ttl\030\004 \001(\r\022!\n\007results\030\005 \003(\0132\020.hbase.", - "pb.Result\022\r\n\005stale\030\006 \001(\010\022\037\n\027partial_flag" + - "_per_result\030\007 \003(\010\022\036\n\026more_results_in_reg" + - "ion\030\010 \001(\010\022\031\n\021heartbeat_message\030\t \001(\010\022+\n\014" + - "scan_metrics\030\n \001(\0132\025.hbase.pb.ScanMetric" + - "s\"\240\002\n\024BulkLoadHFileRequest\022)\n\006region\030\001 \002" + - "(\0132\031.hbase.pb.RegionSpecifier\022>\n\013family_" + - "path\030\002 \003(\0132).hbase.pb.BulkLoadHFileReque" + - "st.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\022+\n" + - "\010fs_token\030\004 \001(\0132\031.hbase.pb.DelegationTok" + - "en\022\022\n\nbulk_token\030\005 \001(\t\022\030\n\tcopy_file\030\006 \001(", - "\010:\005false\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014" + - "\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n" + - "\006loaded\030\001 \002(\010\"V\n\017DelegationToken\022\022\n\niden" + - "tifier\030\001 \001(\014\022\020\n\010password\030\002 \001(\014\022\014\n\004kind\030\003" + - " \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026PrepareBulkLoad" + - "Request\022\'\n\ntable_name\030\001 \002(\0132\023.hbase.pb.T" + - "ableName\022)\n\006region\030\002 \001(\0132\031.hbase.pb.Regi" + - "onSpecifier\"-\n\027PrepareBulkLoadResponse\022\022" + - "\n\nbulk_token\030\001 \002(\t\"W\n\026CleanupBulkLoadReq" + - "uest\022\022\n\nbulk_token\030\001 \002(\t\022)\n\006region\030\002 \001(\013", - "2\031.hbase.pb.RegionSpecifier\"\031\n\027CleanupBu" + - "lkLoadResponse\"a\n\026CoprocessorServiceCall" + - "\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013m" + - "ethod_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030Cop" + - "rocessorServiceResult\022&\n\005value\030\001 \001(\0132\027.h" + - "base.pb.NameBytesPair\"v\n\031CoprocessorServ" + - "iceRequest\022)\n\006region\030\001 \002(\0132\031.hbase.pb.Re" + - "gionSpecifier\022.\n\004call\030\002 \002(\0132 .hbase.pb.C" + - "oprocessorServiceCall\"o\n\032CoprocessorServ" + - "iceResponse\022)\n\006region\030\001 \002(\0132\031.hbase.pb.R", - "egionSpecifier\022&\n\005value\030\002 \002(\0132\027.hbase.pb" + - ".NameBytesPair\"\226\001\n\006Action\022\r\n\005index\030\001 \001(\r" + - "\022)\n\010mutation\030\002 \001(\0132\027.hbase.pb.MutationPr" + - "oto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.Get\0226\n\014servi" + - "ce_call\030\004 \001(\0132 .hbase.pb.CoprocessorServ" + - "iceCall\"k\n\014RegionAction\022)\n\006region\030\001 \002(\0132" + - "\031.hbase.pb.RegionSpecifier\022\016\n\006atomic\030\002 \001" + - "(\010\022 \n\006action\030\003 \003(\0132\020.hbase.pb.Action\"c\n\017" + - "RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010" + - "\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035\n\022compaction", - "Pressure\030\003 \001(\005:\0010\"j\n\024MultiRegionLoadStat" + - "s\022)\n\006region\030\001 \003(\0132\031.hbase.pb.RegionSpeci" + - "fier\022\'\n\004stat\030\002 \003(\0132\031.hbase.pb.RegionLoad" + - "Stats\"\336\001\n\021ResultOrException\022\r\n\005index\030\001 \001" + - "(\r\022 \n\006result\030\002 \001(\0132\020.hbase.pb.Result\022*\n\t" + - "exception\030\003 \001(\0132\027.hbase.pb.NameBytesPair" + - "\022:\n\016service_result\030\004 \001(\0132\".hbase.pb.Copr" + - "ocessorServiceResult\0220\n\tloadStats\030\005 \001(\0132" + - "\031.hbase.pb.RegionLoadStatsB\002\030\001\"x\n\022Region" + - "ActionResult\0226\n\021resultOrException\030\001 \003(\0132", - "\033.hbase.pb.ResultOrException\022*\n\texceptio" + - "n\030\002 \001(\0132\027.hbase.pb.NameBytesPair\"x\n\014Mult" + - "iRequest\022,\n\014regionAction\030\001 \003(\0132\026.hbase.p" + - "b.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\tco" + - "ndition\030\003 \001(\0132\023.hbase.pb.Condition\"\226\001\n\rM" + - "ultiResponse\0228\n\022regionActionResult\030\001 \003(\013" + - "2\034.hbase.pb.RegionActionResult\022\021\n\tproces" + - "sed\030\002 \001(\010\0228\n\020regionStatistics\030\003 \001(\0132\036.hb" + - "ase.pb.MultiRegionLoadStats*\'\n\013Consisten" + - "cy\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\263\005\n\rClientS", - "ervice\0222\n\003Get\022\024.hbase.pb.GetRequest\032\025.hb" + - "ase.pb.GetResponse\022;\n\006Mutate\022\027.hbase.pb." + - "MutateRequest\032\030.hbase.pb.MutateResponse\022" + - "5\n\004Scan\022\025.hbase.pb.ScanRequest\032\026.hbase.p" + - "b.ScanResponse\022P\n\rBulkLoadHFile\022\036.hbase." + - "pb.BulkLoadHFileRequest\032\037.hbase.pb.BulkL" + - "oadHFileResponse\022V\n\017PrepareBulkLoad\022 .hb" + - "ase.pb.PrepareBulkLoadRequest\032!.hbase.pb" + - ".PrepareBulkLoadResponse\022V\n\017CleanupBulkL" + - "oad\022 .hbase.pb.CleanupBulkLoadRequest\032!.", - "hbase.pb.CleanupBulkLoadResponse\022X\n\013Exec" + - "Service\022#.hbase.pb.CoprocessorServiceReq" + - "uest\032$.hbase.pb.CoprocessorServiceRespon" + - "se\022d\n\027ExecRegionServerService\022#.hbase.pb" + - ".CoprocessorServiceRequest\032$.hbase.pb.Co" + - "processorServiceResponse\0228\n\005Multi\022\026.hbas" + - "e.pb.MultiRequest\032\027.hbase.pb.MultiRespon" + - "seBI\n1org.apache.hadoop.hbase.shaded.pro" + - "tobuf.generatedB\014ClientProtosH\001\210\001\001\240\001\001" + "lumnFamilyTimeRange\022\032\n\017mvcc_read_point\030\024", + " \001(\004:\0010\"\246\002\n\013ScanRequest\022)\n\006region\030\001 \001(\0132" + + "\031.hbase.pb.RegionSpecifier\022\034\n\004scan\030\002 \001(\013" + + "2\016.hbase.pb.Scan\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016" + + "number_of_rows\030\004 \001(\r\022\025\n\rclose_scanner\030\005 " + + "\001(\010\022\025\n\rnext_call_seq\030\006 \001(\004\022\037\n\027client_han" + + "dles_partials\030\007 \001(\010\022!\n\031client_handles_he" + + "artbeats\030\010 \001(\010\022\032\n\022track_scan_metrics\030\t \001" + + "(\010\022\024\n\005renew\030\n \001(\010:\005false\"\266\002\n\014ScanRespons" + + "e\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n\nscanner_i" + + "d\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001", + "(\r\022!\n\007results\030\005 \003(\0132\020.hbase.pb.Result\022\r\n" + + "\005stale\030\006 \001(\010\022\037\n\027partial_flag_per_result\030" + + "\007 \003(\010\022\036\n\026more_results_in_region\030\010 \001(\010\022\031\n" + + "\021heartbeat_message\030\t \001(\010\022+\n\014scan_metrics" + + "\030\n \001(\0132\025.hbase.pb.ScanMetrics\022\032\n\017mvcc_re" + + "ad_point\030\013 \001(\004:\0010\"\240\002\n\024BulkLoadHFileReque" + + "st\022)\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpec" + + "ifier\022>\n\013family_path\030\002 \003(\0132).hbase.pb.Bu" + + "lkLoadHFileRequest.FamilyPath\022\026\n\016assign_" + + "seq_num\030\003 \001(\010\022+\n\010fs_token\030\004 \001(\0132\031.hbase.", + "pb.DelegationToken\022\022\n\nbulk_token\030\005 \001(\t\022\030" + + "\n\tcopy_file\030\006 \001(\010:\005false\032*\n\nFamilyPath\022\016" + + "\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoad" + + "HFileResponse\022\016\n\006loaded\030\001 \002(\010\"V\n\017Delegat" + + "ionToken\022\022\n\nidentifier\030\001 \001(\014\022\020\n\010password" + + "\030\002 \001(\014\022\014\n\004kind\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"l\n" + + "\026PrepareBulkLoadRequest\022\'\n\ntable_name\030\001 " + + "\002(\0132\023.hbase.pb.TableName\022)\n\006region\030\002 \001(\013" + + "2\031.hbase.pb.RegionSpecifier\"-\n\027PrepareBu" + + "lkLoadResponse\022\022\n\nbulk_token\030\001 \002(\t\"W\n\026Cl", + "eanupBulkLoadRequest\022\022\n\nbulk_token\030\001 \002(\t" + + "\022)\n\006region\030\002 \001(\0132\031.hbase.pb.RegionSpecif" + + "ier\"\031\n\027CleanupBulkLoadResponse\"a\n\026Coproc" + + "essorServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service" + + "_name\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007requ" + + "est\030\004 \002(\014\"B\n\030CoprocessorServiceResult\022&\n" + + "\005value\030\001 \001(\0132\027.hbase.pb.NameBytesPair\"v\n" + + "\031CoprocessorServiceRequest\022)\n\006region\030\001 \002" + + "(\0132\031.hbase.pb.RegionSpecifier\022.\n\004call\030\002 " + + "\002(\0132 .hbase.pb.CoprocessorServiceCall\"o\n", + "\032CoprocessorServiceResponse\022)\n\006region\030\001 " + + "\002(\0132\031.hbase.pb.RegionSpecifier\022&\n\005value\030" + + "\002 \002(\0132\027.hbase.pb.NameBytesPair\"\226\001\n\006Actio" + + "n\022\r\n\005index\030\001 \001(\r\022)\n\010mutation\030\002 \001(\0132\027.hba" + + "se.pb.MutationProto\022\032\n\003get\030\003 \001(\0132\r.hbase" + + ".pb.Get\0226\n\014service_call\030\004 \001(\0132 .hbase.pb" + + ".CoprocessorServiceCall\"k\n\014RegionAction\022" + + ")\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifi" + + "er\022\016\n\006atomic\030\002 \001(\010\022 \n\006action\030\003 \003(\0132\020.hba" + + "se.pb.Action\"c\n\017RegionLoadStats\022\027\n\014memst", + "oreLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:" + + "\0010\022\035\n\022compactionPressure\030\003 \001(\005:\0010\"j\n\024Mul" + + "tiRegionLoadStats\022)\n\006region\030\001 \003(\0132\031.hbas" + + "e.pb.RegionSpecifier\022\'\n\004stat\030\002 \003(\0132\031.hba" + + "se.pb.RegionLoadStats\"\336\001\n\021ResultOrExcept" + + "ion\022\r\n\005index\030\001 \001(\r\022 \n\006result\030\002 \001(\0132\020.hba" + + "se.pb.Result\022*\n\texception\030\003 \001(\0132\027.hbase." + + "pb.NameBytesPair\022:\n\016service_result\030\004 \001(\013" + + "2\".hbase.pb.CoprocessorServiceResult\0220\n\t" + + "loadStats\030\005 \001(\0132\031.hbase.pb.RegionLoadSta", + "tsB\002\030\001\"x\n\022RegionActionResult\0226\n\021resultOr" + + "Exception\030\001 \003(\0132\033.hbase.pb.ResultOrExcep" + + "tion\022*\n\texception\030\002 \001(\0132\027.hbase.pb.NameB" + + "ytesPair\"x\n\014MultiRequest\022,\n\014regionAction" + + "\030\001 \003(\0132\026.hbase.pb.RegionAction\022\022\n\nnonceG" + + "roup\030\002 \001(\004\022&\n\tcondition\030\003 \001(\0132\023.hbase.pb" + + ".Condition\"\226\001\n\rMultiResponse\0228\n\022regionAc" + + "tionResult\030\001 \003(\0132\034.hbase.pb.RegionAction" + + "Result\022\021\n\tprocessed\030\002 \001(\010\0228\n\020regionStati" + + "stics\030\003 \001(\0132\036.hbase.pb.MultiRegionLoadSt", + "ats*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELI" + + "NE\020\0012\263\005\n\rClientService\0222\n\003Get\022\024.hbase.pb" + + ".GetRequest\032\025.hbase.pb.GetResponse\022;\n\006Mu" + + "tate\022\027.hbase.pb.MutateRequest\032\030.hbase.pb" + + ".MutateResponse\0225\n\004Scan\022\025.hbase.pb.ScanR" + + "equest\032\026.hbase.pb.ScanResponse\022P\n\rBulkLo" + + "adHFile\022\036.hbase.pb.BulkLoadHFileRequest\032" + + "\037.hbase.pb.BulkLoadHFileResponse\022V\n\017Prep" + + "areBulkLoad\022 .hbase.pb.PrepareBulkLoadRe" + + "quest\032!.hbase.pb.PrepareBulkLoadResponse", + "\022V\n\017CleanupBulkLoad\022 .hbase.pb.CleanupBu" + + "lkLoadRequest\032!.hbase.pb.CleanupBulkLoad" + + "Response\022X\n\013ExecService\022#.hbase.pb.Copro" + + "cessorServiceRequest\032$.hbase.pb.Coproces" + + "sorServiceResponse\022d\n\027ExecRegionServerSe" + + "rvice\022#.hbase.pb.CoprocessorServiceReque" + + "st\032$.hbase.pb.CoprocessorServiceResponse" + + "\0228\n\005Multi\022\026.hbase.pb.MultiRequest\032\027.hbas" + + "e.pb.MultiResponseBI\n1org.apache.hadoop." + + "hbase.shaded.protobuf.generatedB\014ClientP", + "rotosH\001\210\001\001\240\001\001" }; org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() { @@ -40639,7 +40865,7 @@ public final class ClientProtos { internal_static_hbase_pb_Scan_fieldAccessorTable = new org.apache.hadoop.hbase.shaded.com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_hbase_pb_Scan_descriptor, - new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", "AllowPartialResults", "CfTimeRange", }); + new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", "AllowPartialResults", "CfTimeRange", "MvccReadPoint", }); internal_static_hbase_pb_ScanRequest_descriptor = getDescriptor().getMessageTypes().get(12); internal_static_hbase_pb_ScanRequest_fieldAccessorTable = new @@ -40651,7 +40877,7 @@ public final class ClientProtos { internal_static_hbase_pb_ScanResponse_fieldAccessorTable = new org.apache.hadoop.hbase.shaded.com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_hbase_pb_ScanResponse_descriptor, - new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", "HeartbeatMessage", "ScanMetrics", }); + new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", "HeartbeatMessage", "ScanMetrics", "MvccReadPoint", }); internal_static_hbase_pb_BulkLoadHFileRequest_descriptor = getDescriptor().getMessageTypes().get(14); internal_static_hbase_pb_BulkLoadHFileRequest_fieldAccessorTable = new http://git-wip-us.apache.org/repos/asf/hbase/blob/890fcbd0/hbase-protocol-shaded/src/main/protobuf/Client.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto index 2feaa26..9a7fea2 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Client.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto @@ -255,6 +255,7 @@ message Scan { optional uint32 caching = 17; optional bool allow_partial_results = 18; repeated ColumnFamilyTimeRange cf_time_range = 19; + optional uint64 mvcc_read_point = 20 [default = 0]; } /** @@ -317,17 +318,22 @@ message ScanResponse { // reasons such as the size in bytes or quantity of results accumulated. This field // will true when more results exist in the current region. optional bool more_results_in_region = 8; - + // This field is filled in if the server is sending back a heartbeat message. // Heartbeat messages are sent back to the client to prevent the scanner from // timing out. Seeing a heartbeat message communicates to the Client that the // server would have continued to scan had the time limit not been reached. optional bool heartbeat_message = 9; - + // This field is filled in if the client has requested that scan metrics be tracked. - // The metrics tracked here are sent back to the client to be tracked together with + // The metrics tracked here are sent back to the client to be tracked together with // the existing client side metrics. optional ScanMetrics scan_metrics = 10; + + // The mvcc read point which is used to open the scanner at server side. Client can + // make use of this mvcc_read_point when restarting a scanner to get a consistent view + // of a row. + optional uint64 mvcc_read_point = 11 [default = 0]; } /**