This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.2 by this push:
new bd0750f461 PHOENIX-5117 : Return the count of rows scanned in HBase
(#1936) (#2088)
bd0750f461 is described below
commit bd0750f461dfa123c91cee412b21eab83e9403b1
Author: Palash Chauhan <[email protected]>
AuthorDate: Sat Mar 15 10:15:19 2025 -0700
PHOENIX-5117 : Return the count of rows scanned in HBase (#1936) (#2088)
---
.../org/apache/phoenix/compile/QueryCompiler.java | 29 +-
.../apache/phoenix/compile/StatementContext.java | 12 +
.../org/apache/phoenix/execute/HashJoinPlan.java | 3 +
.../phoenix/iterate/MergeSortResultIterator.java | 5 +-
.../phoenix/iterate/OffsetResultIterator.java | 1 -
.../phoenix/iterate/OrderedResultIterator.java | 3 +-
.../org/apache/phoenix/jdbc/PhoenixResultSet.java | 9 +-
.../apache/phoenix/optimize/QueryOptimizer.java | 3 +-
.../hbase/regionserver/ScannerContextUtil.java | 26 +-
.../cache/aggcache/SpillableGroupByCache.java | 6 +
.../phoenix/coprocessor/BaseRegionScanner.java | 7 +-
.../coprocessor/BaseScannerRegionObserver.java | 4 +-
.../phoenix/coprocessor/DelegateRegionScanner.java | 32 +-
.../GroupedAggregateRegionObserver.java | 49 ++-
.../phoenix/coprocessor/HashJoinRegionScanner.java | 28 +-
.../coprocessor/IndexRebuildRegionScanner.java | 5 +
.../coprocessor/IndexRepairRegionScanner.java | 5 +
.../phoenix/coprocessor/IndexerRegionScanner.java | 5 +
.../phoenix/coprocessor/PagingRegionScanner.java | 55 +++-
.../coprocessor/PhoenixTTLRegionObserver.java | 6 +-
.../phoenix/coprocessor/TTLRegionScanner.java | 38 ++-
.../coprocessor/UncoveredIndexRegionScanner.java | 5 +
.../UncoveredLocalIndexRegionScanner.java | 5 +
.../UngroupedAggregateRegionObserver.java | 24 ++
.../UngroupedAggregateRegionScanner.java | 17 +-
.../apache/phoenix/index/GlobalIndexChecker.java | 19 +-
.../iterate/NonAggregateRegionScannerFactory.java | 92 +++++-
.../phoenix/iterate/RegionScannerFactory.java | 33 +-
.../iterate/RegionScannerResultIterator.java | 23 +-
.../org/apache/phoenix/end2end/ServerPagingIT.java | 40 +++
.../phoenix/monitoring/CountRowsScannedIT.java | 332 +++++++++++++++++++++
.../hbase/index/covered/NonTxIndexBuilderTest.java | 5 +
32 files changed, 826 insertions(+), 100 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index eb60562c32..e2be77cea9 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -116,20 +116,29 @@ public class QueryCompiler {
private final boolean optimizeSubquery;
private final Map<TableRef, QueryPlan> dataPlans;
private final boolean costBased;
+ private final StatementContext parentContext;
public QueryCompiler(PhoenixStatement statement, SelectStatement select,
ColumnResolver resolver, boolean projectTuples, boolean optimizeSubquery,
Map<TableRef, QueryPlan> dataPlans) throws SQLException {
this(statement, select, resolver, Collections.<PDatum>emptyList(),
null, new SequenceManager(statement), projectTuples, optimizeSubquery,
dataPlans);
}
public QueryCompiler(PhoenixStatement statement, SelectStatement select,
ColumnResolver resolver, BindManager bindManager, boolean projectTuples,
boolean optimizeSubquery, Map<TableRef, QueryPlan> dataPlans) throws
SQLException {
- this(statement, select, resolver, bindManager,
Collections.<PDatum>emptyList(), null, new SequenceManager(statement),
projectTuples, optimizeSubquery, dataPlans);
+ this(statement, select, resolver, bindManager,
Collections.<PDatum>emptyList(), null, new SequenceManager(statement),
projectTuples, optimizeSubquery, dataPlans, null);
}
public QueryCompiler(PhoenixStatement statement, SelectStatement select,
ColumnResolver resolver, List<? extends PDatum> targetColumns,
ParallelIteratorFactory parallelIteratorFactory, SequenceManager
sequenceManager, boolean projectTuples, boolean optimizeSubquery, Map<TableRef,
QueryPlan> dataPlans) throws SQLException {
- this(statement, select, resolver, new
BindManager(statement.getParameters()), targetColumns, parallelIteratorFactory,
sequenceManager, projectTuples, optimizeSubquery, dataPlans);
+ this(statement, select, resolver, new
BindManager(statement.getParameters()), targetColumns, parallelIteratorFactory,
sequenceManager, projectTuples, optimizeSubquery, dataPlans, null);
}
public QueryCompiler(PhoenixStatement statement, SelectStatement select,
ColumnResolver resolver, BindManager bindManager, List<? extends PDatum>
targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager
sequenceManager, boolean projectTuples, boolean optimizeSubquery, Map<TableRef,
QueryPlan> dataPlans) throws SQLException {
+ this(statement, select, resolver, bindManager, targetColumns,
parallelIteratorFactory, sequenceManager, projectTuples, optimizeSubquery,
dataPlans, null);
+ }
+
+ public QueryCompiler(PhoenixStatement statement, SelectStatement select,
ColumnResolver resolver, List<? extends PDatum> targetColumns,
ParallelIteratorFactory parallelIteratorFactory, SequenceManager
sequenceManager, boolean projectTuples, boolean optimizeSubquery, Map<TableRef,
QueryPlan> dataPlans, StatementContext parentContext) throws SQLException {
+ this(statement, select, resolver, new
BindManager(statement.getParameters()), targetColumns, parallelIteratorFactory,
sequenceManager, projectTuples, optimizeSubquery, dataPlans, parentContext);
+ }
+
+ public QueryCompiler(PhoenixStatement statement, SelectStatement select,
ColumnResolver resolver, BindManager bindManager, List<? extends PDatum>
targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager
sequenceManager, boolean projectTuples, boolean optimizeSubquery, Map<TableRef,
QueryPlan> dataPlans, StatementContext parentContext) throws SQLException {
this.statement = statement;
this.select = select;
this.resolver = resolver;
@@ -152,6 +161,7 @@ public class QueryCompiler {
this.originalScan = ScanUtil.newScan(scan);
this.optimizeSubquery = optimizeSubquery;
this.dataPlans = dataPlans == null ? Collections.<TableRef,
QueryPlan>emptyMap() : dataPlans;
+ this.parentContext = parentContext;
}
public QueryCompiler(PhoenixStatement statement, SelectStatement select,
ColumnResolver resolver, List<? extends PDatum> targetColumns,
ParallelIteratorFactory parallelIteratorFactory, SequenceManager
sequenceManager) throws SQLException {
@@ -239,6 +249,9 @@ public class QueryCompiler {
public QueryPlan compileSelect(SelectStatement select) throws SQLException{
StatementContext context = createStatementContext();
+ if (parentContext != null) {
+ parentContext.addSubStatementContext(context);
+ }
if (select.isJoin()) {
JoinTable joinTable = JoinCompiler.compile(statement, select,
context.getResolver());
return compileJoinQuery(context, joinTable, false, false, null);
@@ -637,9 +650,13 @@ public class QueryCompiler {
return type == JoinType.Semi && complete;
}
+ protected QueryPlan compileSubquery(SelectStatement subquery, boolean
pushDownMaxRows) throws SQLException {
+ return compileSubquery(subquery, pushDownMaxRows, null);
+ }
+
protected QueryPlan compileSubquery(
SelectStatement subquerySelectStatement,
- boolean pushDownMaxRows) throws SQLException {
+ boolean pushDownMaxRows, StatementContext parentContext) throws
SQLException {
PhoenixConnection phoenixConnection = this.statement.getConnection();
RewriteResult rewriteResult =
ParseNodeUtil.rewrite(subquerySelectStatement,
phoenixConnection);
@@ -658,6 +675,9 @@ public class QueryCompiler {
statement,
queryPlan);
}
+ if (parentContext != null) {
+ parentContext.addSubStatementContext(queryPlan.getContext());
+ }
this.statement.setMaxRows(maxRows); // restore maxRows.
return queryPlan;
}
@@ -673,14 +693,13 @@ public class QueryCompiler {
throw new SQLException("RVC Offset not allowed with subqueries.");
}
- QueryPlan innerPlan = compileSubquery(innerSelect, false);
+ QueryPlan innerPlan = compileSubquery(innerSelect, false, context);
if (innerPlan instanceof UnionPlan) {
UnionCompiler.optimizeUnionOrderByIfPossible(
(UnionPlan) innerPlan,
select,
this::createStatementContext);
}
-
RowProjector innerQueryPlanRowProjector = innerPlan.getProjector();
TupleProjector tupleProjector = new
TupleProjector(innerQueryPlanRowProjector);
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/StatementContext.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/StatementContext.java
index a7abbd5dd5..d35d1b1530 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -47,6 +47,7 @@ import org.apache.phoenix.schema.types.PDate;
import org.apache.phoenix.schema.types.PTime;
import org.apache.phoenix.schema.types.PTimestamp;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.NumberUtil;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -86,6 +87,7 @@ public class StatementContext {
private boolean isClientSideUpsertSelect;
private boolean isUncoveredIndex;
private AtomicBoolean hasFirstValidResult;
+ private Set<StatementContext> subStatementContexts;
public StatementContext(PhoenixStatement statement) {
this(statement, new Scan());
@@ -114,6 +116,7 @@ public class StatementContext {
this.isClientSideUpsertSelect = context.isClientSideUpsertSelect;
this.isUncoveredIndex = context.isUncoveredIndex;
this.hasFirstValidResult = new
AtomicBoolean(context.getHasFirstValidResult());
+ this.subStatementContexts = Sets.newHashSet();
}
/**
* Constructor that lets you override whether or not to collect request
level metrics.
@@ -159,6 +162,7 @@ public class StatementContext {
this.overAllQueryMetrics = new
OverAllQueryMetrics(isRequestMetricsEnabled,connection.getLogLevel());
this.retryingPersistentCache = Maps.<Long, Boolean> newHashMap();
this.hasFirstValidResult = new AtomicBoolean(false);
+ this.subStatementContexts = Sets.newHashSet();
}
/**
@@ -390,4 +394,12 @@ public class StatementContext {
return retrying;
}
}
+
+ public void addSubStatementContext(StatementContext sub) {
+ subStatementContexts.add(sub);
+ }
+
+ public Set<StatementContext> getSubStatementContexts() {
+ return subStatementContexts;
+ }
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index fbe5edd161..259d718acb 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -153,6 +153,9 @@ public class HashJoinPlan extends DelegateQueryPlan {
QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB,
QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
this.serverCacheLimit = services.getProps().getLongBytes(
QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB,
QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_SIZE);
+ for (SubPlan subPlan: subPlans) {
+
this.getContext().addSubStatementContext(subPlan.getInnerPlan().getContext());
+ }
}
@Override
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
index d094bec568..8392ab6f48 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
@@ -56,7 +56,10 @@ public abstract class MergeSortResultIterator implements
PeekingResultIterator {
@Override
public Tuple next() throws SQLException {
MaterializedComparableResultIterator iterator = minIterator();
- if (iterator == null) { return null; }
+ if (iterator == null) {
+ close();
+ return null;
+ }
Tuple next = iterator.next();
minHeap.poll();
if (iterator.peek() != null) {
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
index af23429cca..f16bb84948 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
@@ -22,7 +22,6 @@ import java.util.List;
import static org.apache.phoenix.util.ScanUtil.getDummyTuple;
import static org.apache.phoenix.util.ScanUtil.isDummy;
-
import
org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.EnvironmentEdgeManager;
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
index 4e36ae1a2a..3d6713b3b8 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
@@ -217,7 +217,8 @@ public class OrderedResultIterator implements
PeekingResultIterator {
public OrderedResultIterator(ResultIterator delegate,
List<OrderByExpression> orderByExpressions, boolean
spoolingEnabled,
- long thresholdBytes, Integer limit, Integer offset, int
estimatedRowSize, long pageSizeMs) {
+ long thresholdBytes, Integer limit, Integer offset,
+ int estimatedRowSize, long pageSizeMs) {
checkArgument(!orderByExpressions.isEmpty());
this.delegate = delegate;
this.orderByExpressions = orderByExpressions;
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index 4d5f038e0a..fb2fcb3435 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -1480,7 +1480,14 @@ public class PhoenixResultSet implements
PhoenixMonitoredResultSet, SQLCloseable
@Override
public Map<String, Map<MetricType, Long>> getReadMetrics() {
- return readMetricsQueue.aggregate();
+ ReadMetricQueue one = readMetricsQueue;
+ if (context != null) {
+ for (StatementContext sub : context.getSubStatementContexts()) {
+ ReadMetricQueue subMetric = sub.getReadMetricsQueue();
+ one.combineReadMetrics(subMetric);
+ }
+ }
+ return one.aggregate();
}
@Override
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index faec49322b..42cf622824 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -195,7 +195,8 @@ public class QueryOptimizer {
dataPlan.getContext().getSequenceManager(),
true,
true,
- dataPlans);
+ dataPlans,
+ dataPlan.getContext());
return Collections.singletonList(compiler.compile());
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
index 3103457325..7bdb2c0b8b 100644
---
a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
+++
b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
@@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.regionserver;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.PrivateCellUtil;
/**
@@ -36,4 +36,28 @@ public class ScannerContextUtil {
cell.heapSize());
}
}
+
+ public static void updateMetrics(ScannerContext src, ScannerContext dst) {
+ if (src != null && dst != null && src.isTrackingMetrics() &&
dst.isTrackingMetrics()) {
+ for (Map.Entry<String, Long> entry :
src.getMetrics().getMetricsMap().entrySet()) {
+ dst.metrics.addToCounter(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ public static ScannerContext copyNoLimitScanner(ScannerContext sc) {
+ return new ScannerContext(sc.keepProgress, null,
sc.isTrackingMetrics());
+ }
+
+ public static void updateTimeProgress(ScannerContext sc) {
+ sc.updateTimeProgress();
+ }
+
+ /**
+ * Set returnImmediately on the ScannerContext to true, it will have the
same behavior
+ * as reaching the time limit. Use this to make RSRpcService.scan return
immediately.
+ */
+ public static void setReturnImmediately(ScannerContext sc) {
+ sc.returnImmediately();
+ }
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
index a753365692..f677b081f2 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.TenantCache;
@@ -365,6 +366,11 @@ public class SpillableGroupByCache implements GroupByCache
{
}
}
+ public boolean next(List<Cell> result, ScannerContext
scannerContext)
+ throws IOException {
+ return next(result);
+ }
+
@Override
public boolean next(List<Cell> results) throws IOException {
if (!cacheIter.hasNext()) {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
index 8533bb6391..23a9f075c4 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
@@ -42,9 +42,8 @@ public abstract class BaseRegionScanner extends
DelegateRegionScanner {
public abstract boolean next(List<Cell> results) throws IOException;
@Override
- public boolean next(List<Cell> result, ScannerContext scannerContext)
throws IOException {
- throw new IOException("Next with scannerContext should not be called
in Phoenix environment");
- }
+ public abstract boolean next(List<Cell> result, ScannerContext
scannerContext)
+ throws IOException;
@Override
public boolean reseek(byte[] row) throws IOException {
@@ -58,6 +57,6 @@ public abstract class BaseRegionScanner extends
DelegateRegionScanner {
@Override
public boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
throws IOException {
- throw new IOException("NextRaw with scannerContext should not be
called in Phoenix environment");
+ return next(result, scannerContext);
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 52062aa3de..d893f7f455 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -237,7 +237,7 @@ abstract public class BaseScannerRegionObserver implements
RegionObserver {
@Override
public boolean next(List<Cell> result, ScannerContext
scannerContext) throws IOException {
overrideDelegate();
- boolean res = super.next(result);
+ boolean res = super.next(result, scannerContext);
ScannerContextUtil.incrementSizeProgress(scannerContext,
result);
return res;
}
@@ -251,7 +251,7 @@ abstract public class BaseScannerRegionObserver implements
RegionObserver {
@Override
public boolean nextRaw(List<Cell> result, ScannerContext
scannerContext) throws IOException {
overrideDelegate();
- boolean res = super.nextRaw(result);
+ boolean res = super.nextRaw(result, scannerContext);
ScannerContextUtil.incrementSizeProgress(scannerContext,
result);
return res;
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
index bbd51f3396..3d74243191 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
@@ -25,6 +25,9 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
+
+import static org.apache.phoenix.util.ScanUtil.isDummy;
public class DelegateRegionScanner implements RegionScanner {
@@ -61,22 +64,22 @@ public class DelegateRegionScanner implements RegionScanner
{
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext)
throws IOException {
- throw new IOException("Next with scannerContext should not be called
in Phoenix environment");
+ return next(result, false, scannerContext);
}
@Override
public boolean next(List<Cell> result) throws IOException {
- return delegate.next(result);
+ return next(result, false, null);
}
@Override
public boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
throws IOException {
- throw new IOException("NextRaw with scannerContext should not be
called in Phoenix environment");
+ return next(result, true, scannerContext);
}
@Override
- public boolean nextRaw(List<Cell> arg0) throws IOException {
- return delegate.nextRaw(arg0);
+ public boolean nextRaw(List<Cell> result) throws IOException {
+ return next(result, true, null);
}
@Override
@@ -97,4 +100,23 @@ public class DelegateRegionScanner implements RegionScanner
{
throw new DoNotRetryIOException(e);
}
}
+
+ private boolean next(List<Cell> result, boolean raw, ScannerContext
scannerContext)
+ throws IOException {
+ if (scannerContext != null) {
+ ScannerContext noLimitContext = ScannerContextUtil
+ .copyNoLimitScanner(scannerContext);
+ boolean hasMore = raw
+ ? delegate.nextRaw(result, noLimitContext)
+ : delegate.next(result, noLimitContext);
+ if (isDummy(result)) {
+ // when a dummy row is returned by a lower layer, set
returnImmediately
+ // on the ScannerContext to force HBase to return a response
to the client
+ ScannerContextUtil.setReturnImmediately(scannerContext);
+ }
+ ScannerContextUtil.updateMetrics(noLimitContext, scannerContext);
+ return hasMore;
+ }
+ return raw ? delegate.nextRaw(result) : delegate.next(result);
+ }
}
\ No newline at end of file
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 29e8c0a147..f36412f03f 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
@@ -352,6 +353,11 @@ public class GroupedAggregateRegionObserver extends
BaseScannerRegionObserver im
return new BaseRegionScanner(s) {
private int index = 0;
+ public boolean next(List<Cell> result, ScannerContext
scannerContext)
+ throws IOException {
+ return next(result);
+ }
+
@Override
public void close() throws IOException {
try {
@@ -482,8 +488,20 @@ public class GroupedAggregateRegionObserver extends
BaseScannerRegionObserver im
}
}
+ @Override
+ public boolean nextRaw(List<Cell> results, ScannerContext
scannerContext)
+ throws IOException {
+ return next(results, scannerContext);
+ }
+
@Override
public boolean next(List<Cell> resultsToReturn) throws IOException {
+ return next(resultsToReturn, null);
+ }
+
+ @Override
+ public boolean next(List<Cell> resultsToReturn, ScannerContext
scannerContext)
+ throws IOException {
if (firstScan && actualScanStartRowKey != null) {
if (scanStartRowKey.length > 0 &&
!ScanUtil.isLocalIndex(scan)) {
if (hasRegionMoved()) {
@@ -522,7 +540,7 @@ public class GroupedAggregateRegionObserver extends
BaseScannerRegionObserver im
if (firstScan) {
firstScan = false;
}
- boolean moreRows = nextInternal(resultsToReturn);
+ boolean moreRows = nextInternal(resultsToReturn, scannerContext);
if (ScanUtil.isDummy(resultsToReturn)) {
return true;
}
@@ -560,7 +578,7 @@ public class GroupedAggregateRegionObserver extends
BaseScannerRegionObserver im
// If includeStartRowKey is false and the current
rowkey is matching
// with scanStartRowKey, return the next row result.
resultsToReturn.clear();
- moreRows = nextInternal(resultsToReturn);
+ moreRows = nextInternal(resultsToReturn,
scannerContext);
if (ScanUtil.isDummy(resultsToReturn)) {
return true;
}
@@ -578,7 +596,7 @@ public class GroupedAggregateRegionObserver extends
BaseScannerRegionObserver im
// If includeStartRowKey is true and the (current
rowkey + "\0xx") is
// matching with scanStartRowKey, return the next
row result.
resultsToReturn.clear();
- moreRows = nextInternal(resultsToReturn);
+ moreRows = nextInternal(resultsToReturn,
scannerContext);
if (ScanUtil.isDummy(resultsToReturn)) {
return true;
}
@@ -590,7 +608,7 @@ public class GroupedAggregateRegionObserver extends
BaseScannerRegionObserver im
}
// In the loop, keep iterating through rows.
resultsToReturn.clear();
- moreRows = nextInternal(resultsToReturn);
+ moreRows = nextInternal(resultsToReturn, scannerContext);
if (ScanUtil.isDummy(resultsToReturn)) {
return true;
}
@@ -609,7 +627,8 @@ public class GroupedAggregateRegionObserver extends
BaseScannerRegionObserver im
* @return true if more rows exist after this one, false if scanner is
done.
* @throws IOException if something goes wrong.
*/
- private boolean nextInternal(List<Cell> resultsToReturn) throws
IOException {
+ private boolean nextInternal(List<Cell> resultsToReturn,
ScannerContext scannerContext)
+ throws IOException {
boolean hasMore;
long startTime = EnvironmentEdgeManager.currentTimeMillis();
long now;
@@ -620,7 +639,7 @@ public class GroupedAggregateRegionObserver extends
BaseScannerRegionObserver im
acquiredLock = true;
synchronized (delegate) {
if (regionScanner != null) {
- return regionScanner.next(resultsToReturn);
+ return regionScanner.next(resultsToReturn,
scannerContext);
}
do {
List<Cell> results = useQualifierAsIndex ?
@@ -632,7 +651,9 @@ public class GroupedAggregateRegionObserver extends
BaseScannerRegionObserver im
// since this is an indication of whether or not there
are
// more values after the
// ones returned
- hasMore = delegate.nextRaw(results);
+ hasMore = (scannerContext == null)
+ ? delegate.nextRaw(results)
+ : delegate.nextRaw(results,
scannerContext);
if (!results.isEmpty()) {
if (isDummy(results)) {
return getDummyResult(resultsToReturn);
@@ -779,8 +800,18 @@ public class GroupedAggregateRegionObserver extends
BaseScannerRegionObserver im
}
}
+ @Override
+ public boolean nextRaw(List<Cell> results, ScannerContext
scannerContext)
+ throws IOException {
+ return next(results, scannerContext);
+ }
+
@Override
public boolean next(List<Cell> results) throws IOException {
+ return next(results, null);
+ }
+ @Override
+ public boolean next(List<Cell> results, ScannerContext scannerContext)
throws IOException {
boolean hasMore;
boolean atLimit;
boolean aggBoundary = false;
@@ -807,7 +838,9 @@ public class GroupedAggregateRegionObserver extends
BaseScannerRegionObserver im
// since this is an indication of whether or not there
// are more values after the
// ones returned
- hasMore = delegate.nextRaw(kvs);
+ hasMore = (scannerContext == null)
+ ? delegate.nextRaw(kvs)
+ : delegate.nextRaw(kvs, scannerContext);
if (!kvs.isEmpty()) {
if (isDummy(kvs)) {
updateDummyWithPrevRowKey(results,
initStartRowKey,
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index 487eb86aba..6a2b3459da 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -297,10 +297,26 @@ public class HashJoinRegionScanner implements
RegionScanner {
@Override
public boolean nextRaw(List<Cell> result) throws IOException {
+ return next(result, true, null);
+ }
+
+ @Override
+ public boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
throws IOException {
+ return next(result, true, scannerContext);
+ }
+
+
+ private boolean next(List<Cell> result, boolean raw, ScannerContext
scannerContext) throws IOException {
try {
long startTime = EnvironmentEdgeManager.currentTimeMillis();
while (shouldAdvance()) {
- hasMore = scanner.nextRaw(result);
+ if (scannerContext != null) {
+ hasMore = raw
+ ? scanner.nextRaw(result, scannerContext)
+ : scanner.next(result, scannerContext);
+ } else {
+ hasMore = raw ? scanner.nextRaw(result) :
scanner.next(result);
+ }
if (isDummy(result)) {
return true;
}
@@ -325,12 +341,6 @@ public class HashJoinRegionScanner implements
RegionScanner {
}
}
- @Override
- public boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
- throws IOException {
- throw new IOException("Next with scannerContext should not be called
in Phoenix environment");
- }
-
@Override
public boolean reseek(byte[] row) throws IOException {
return scanner.reseek(row);
@@ -343,12 +353,12 @@ public class HashJoinRegionScanner implements
RegionScanner {
@Override
public boolean next(List<Cell> result) throws IOException {
- throw new IOException("Next should not be used in HashJoin scanner");
+ return next(result, false, null);
}
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext)
throws IOException {
- throw new IOException("Next with scannerContext should not be called
in Phoenix environment");
+ return next(result, false, scannerContext);
}
@Override
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 9b7402149f..960a1a8cfb 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -34,6 +34,7 @@ import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.Cell;
@@ -429,4 +430,8 @@ public class IndexRebuildRegionScanner extends
GlobalIndexRegionScanner {
results.add(aggKeyValue);
return hasMore || hasMoreIncr;
}
+
+ public boolean next(List<Cell> result, ScannerContext scannerContext)
throws IOException {
+ return next(result);
+ }
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
index 19707bd533..bc87704e29 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
@@ -34,6 +34,7 @@ import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -479,4 +480,8 @@ public class IndexRepairRegionScanner extends
GlobalIndexRegionScanner {
results.add(aggKeyValue);
return hasMore || hasMoreIncr;
}
+
+ public boolean next(List<Cell> result, ScannerContext scannerContext)
throws IOException {
+ return next(result);
+ }
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
index 42e51d6606..0bb4249581 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
@@ -50,6 +50,7 @@ import
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
@@ -477,4 +478,8 @@ public class IndexerRegionScanner extends
GlobalIndexRegionScanner {
public long getMaxResultSize() {
return scan.getMaxResultSize();
}
+
+ public boolean next(List<Cell> result, ScannerContext scannerContext)
throws IOException {
+ return next(result);
+ }
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
index 719ce43731..66aa9598a8 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.filter.PagingFilter;
import org.apache.phoenix.filter.SkipScanFilter;
@@ -130,12 +131,20 @@ public class PagingRegionScanner extends
BaseRegionScanner {
private boolean hasMore() {
return lookupPosition < pointLookupRanges.size();
}
- private boolean next(List<Cell> results, boolean raw, RegionScanner
scanner)
- throws IOException {
+ private boolean next(List<Cell> results, boolean raw, RegionScanner
scanner,
+ ScannerContext scannerContext) throws IOException
{
try {
long startTime = EnvironmentEdgeManager.currentTimeMillis();
while (true) {
- if (raw ? scanner.nextRaw(results) :
scanner.next(results)) {
+ boolean hasMore;
+ if (scannerContext != null) {
+ hasMore = raw
+ ? scanner.nextRaw(results, scannerContext)
+ : scanner.next(results, scannerContext);
+ } else {
+ hasMore = raw ? scanner.nextRaw(results) :
scanner.next(results);
+ }
+ if (hasMore) {
// Since each scan is supposed to return only one row
(even when the
// start and stop row key are not the same, which
happens after region
// moves or when there are delete markers in the
table), this should not
@@ -197,7 +206,8 @@ public class PagingRegionScanner extends BaseRegionScanner {
initialized = true;
}
- private boolean next(List<Cell> results, boolean raw) throws IOException {
+ private boolean next(List<Cell> results, boolean raw, ScannerContext
scannerContext)
+ throws IOException {
init();
if (pagingFilter != null) {
pagingFilter.init();
@@ -230,19 +240,26 @@ public class PagingRegionScanner extends
BaseRegionScanner {
} else {
if (multiKeyPointLookup != null) {
- RegionScanner regionScanner =
multiKeyPointLookup.getNewScanner();
- if (regionScanner == null) {
- return false;
- }
- delegate.close();
- delegate = regionScanner;
+ RegionScanner regionScanner =
multiKeyPointLookup.getNewScanner();
+ if (regionScanner == null) {
+ return false;
+ }
+ delegate.close();
+ delegate = regionScanner;
}
}
if (multiKeyPointLookup != null) {
- return multiKeyPointLookup.next(results, raw, delegate);
+ return multiKeyPointLookup.next(results, raw, delegate,
scannerContext);
+ }
+ boolean hasMore;
+ if (scannerContext != null) {
+ hasMore = raw
+ ? delegate.nextRaw(results, scannerContext)
+ : delegate.next(results, scannerContext);
+ } else {
+ hasMore = raw ? delegate.nextRaw(results) : delegate.next(results);
}
- boolean hasMore = raw ? delegate.nextRaw(results) :
delegate.next(results);
if (pagingFilter == null) {
return hasMore;
}
@@ -268,12 +285,22 @@ public class PagingRegionScanner extends
BaseRegionScanner {
@Override
public boolean next(List<Cell> results) throws IOException {
- return next(results, false);
+ return next(results, false, null);
}
@Override
public boolean nextRaw(List<Cell> results) throws IOException {
- return next(results, true);
+ return next(results, true, null);
+ }
+
+ @Override
+ public boolean next(List<Cell> results, ScannerContext scannerContext)
throws IOException {
+ return next(results, false, scannerContext);
+ }
+
+ @Override
+ public boolean nextRaw(List<Cell> results, ScannerContext scannerContext)
throws IOException {
+ return next(results, true, scannerContext);
}
@Override
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java
index cc5d416513..810d965120 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java
@@ -176,14 +176,12 @@ public class PhoenixTTLRegionObserver extends
BaseScannerRegionObserver implemen
@Override public boolean next(List<Cell> result, ScannerContext
scannerContext)
throws IOException {
- throw new IOException(
- "next with scannerContext should not be called in Phoenix
environment");
+ return next(result);
}
@Override public boolean nextRaw(List<Cell> result, ScannerContext
scannerContext)
throws IOException {
- throw new IOException(
- "NextRaw with scannerContext should not be called in
Phoenix environment");
+ return nextRaw(result);
}
@Override public void close() throws IOException {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
index 57fcc60c44..6a7c5c58fa 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.util.EnvironmentEdgeManager;
@@ -210,15 +211,32 @@ public class TTLRegionScanner extends BaseRegionScanner {
return false;
}
- private boolean next(List<Cell> result, boolean raw) throws IOException {
+ private boolean next(List<Cell> result, boolean raw, ScannerContext
scannerContext)
+ throws IOException {
+ boolean hasMore;
if (!isMaskingEnabled) {
- return raw ? delegate.nextRaw(result) : delegate.next(result);
+ if (scannerContext != null) {
+ hasMore = raw
+ ? delegate.nextRaw(result, scannerContext)
+ : delegate.next(result, scannerContext);
+ } else {
+ hasMore = raw ? delegate.nextRaw(result) :
delegate.next(result);
+ }
+ return hasMore;
}
if (!initialized) {
init();
initialized = true;
}
- boolean hasMore = raw ? delegate.nextRaw(result) :
delegate.next(result);
+
+ if (scannerContext != null) {
+ hasMore = raw
+ ? delegate.nextRaw(result, scannerContext)
+ : delegate.next(result, scannerContext);
+ } else {
+ hasMore = raw ? delegate.nextRaw(result) : delegate.next(result);
+ }
+
if (result.isEmpty() || ScanUtil.isDummy(result)) {
return hasMore;
}
@@ -235,12 +253,22 @@ public class TTLRegionScanner extends BaseRegionScanner {
@Override
public boolean next(List<Cell> results) throws IOException {
- return next(results, false);
+ return next(results, false, null);
}
@Override
public boolean nextRaw(List<Cell> results) throws IOException {
- return next(results, true);
+ return next(results, true, null);
+ }
+
+ @Override
+ public boolean next(List<Cell> results, ScannerContext scannerContext)
throws IOException {
+ return next(results, false, scannerContext);
+ }
+
+ @Override
+ public boolean nextRaw(List<Cell> results, ScannerContext scannerContext)
throws IOException {
+ return next(results, true, scannerContext);
}
@Override
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
index ec61489af7..691b9265ba 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
@@ -31,6 +31,7 @@ import
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.execute.TupleProjector;
@@ -359,6 +360,10 @@ public abstract class UncoveredIndexRegionScanner extends
BaseRegionScanner {
}
}
+ public boolean next(List<Cell> result, ScannerContext scannerContext)
throws IOException {
+ return next(result);
+ }
+
/**
* A page of index rows are scanned and then their corresponding data
table rows are retrieved
* from the data table regions in parallel. These data rows are then
joined with index rows.
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
index a69b5a2e89..39b8733a0e 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
@@ -25,6 +25,7 @@ import
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMaintainer;
@@ -129,4 +130,8 @@ public class UncoveredLocalIndexRegionScanner extends
UncoveredIndexRegionScanne
ServerIndexUtil.wrapResultUsingOffset(result, offset);
return hasMore;
}
+
+ public boolean next(List<Cell> result, ScannerContext scannerContext)
throws IOException {
+ return next(result);
+ }
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index b0b1d4196d..986c7209ab 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -66,6 +66,8 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
import org.apache.hadoop.hbase.regionserver.Store;
import
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -750,6 +752,13 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
private RegionScanner collectStats(final RegionScanner innerScanner,
StatisticsCollector stats,
final Region region, final Scan scan,
Configuration config) throws IOException {
+ ScannerContext groupScannerContext;
+ if (scan.isScanMetricsEnabled()) {
+ groupScannerContext = ScannerContext.newBuilder()
+ .setTrackMetrics(scan.isScanMetricsEnabled()).build();
+ } else {
+ groupScannerContext = null;
+ }
StatsCollectionCallable callable =
new StatsCollectionCallable(stats, region, innerScanner,
config, scan);
byte[] asyncBytes =
scan.getAttribute(BaseScannerRegionObserverConstants.RUN_UPDATE_STATS_ASYNC_ATTRIB);
@@ -799,6 +808,21 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
}
}
+ @Override
+ public boolean next(List<Cell> results, ScannerContext
scannerContext)
+ throws IOException {
+ if (groupScannerContext != null && scannerContext != null) {
+ ScannerContextUtil.updateMetrics(groupScannerContext,
scannerContext);
+ }
+ return next(results);
+ }
+
+ @Override
+ public boolean nextRaw(List<Cell> results, ScannerContext
scannerContext)
+ throws IOException {
+ return next(results, scannerContext);
+ }
+
@Override
public boolean next(List<Cell> results) throws IOException {
results.add(aggKeyValue);
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
index 65f76c78e7..dc574c015e 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
@@ -63,6 +63,7 @@ import
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.cache.GlobalCache;
@@ -560,8 +561,20 @@ public class UngroupedAggregateRegionScanner extends
BaseRegionScanner {
}
}
+ @Override
+ public boolean nextRaw(List<Cell> results, ScannerContext scannerContext)
+ throws IOException {
+ return next(results, scannerContext);
+ }
+
@Override
public boolean next(List<Cell> resultsToReturn) throws IOException {
+ return next(resultsToReturn, null);
+ }
+
+ @Override
+ public boolean next(List<Cell> resultsToReturn, ScannerContext
scannerContext)
+ throws IOException {
boolean hasMore;
long startTime = EnvironmentEdgeManager.currentTimeMillis();
Configuration conf = env.getConfiguration();
@@ -589,7 +602,9 @@ public class UngroupedAggregateRegionScanner extends
BaseRegionScanner {
// Results are potentially returned even when the
return value of s.next is false
// since this is an indication of whether or not there
are more values after the
// ones returned
- hasMore = innerScanner.nextRaw(results);
+ hasMore = (scannerContext == null)
+ ? innerScanner.nextRaw(results)
+ : innerScanner.nextRaw(results, scannerContext);
if (isDummy(results)) {
if (!hasAny) {
resultsToReturn.addAll(results);
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 4dbf5c1f6d..62b8677103 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -252,7 +252,8 @@ public class GlobalIndexChecker extends
BaseScannerRegionObserver implements Reg
return true;
}
- public boolean next(List<Cell> result, boolean raw) throws IOException
{
+ public boolean next(List<Cell> result, boolean raw, ScannerContext
scannerContext)
+ throws IOException {
try {
if (!initialized) {
init();
@@ -261,9 +262,13 @@ public class GlobalIndexChecker extends
BaseScannerRegionObserver implements Reg
long startTime = EnvironmentEdgeManager.currentTimeMillis();
do {
if (raw) {
- hasMore = scanner.nextRaw(result);
+ hasMore = (scannerContext == null)
+ ? scanner.nextRaw(result)
+ : scanner.nextRaw(result, scannerContext);
} else {
- hasMore = scanner.next(result);
+ hasMore = (scannerContext == null)
+ ? scanner.next(result)
+ : scanner.next(result, scannerContext);
}
if (result.isEmpty()) {
return hasMore;
@@ -298,22 +303,22 @@ public class GlobalIndexChecker extends
BaseScannerRegionObserver implements Reg
@Override
public boolean next(List<Cell> result) throws IOException {
- return next(result, false);
+ return next(result, false, null);
}
@Override
public boolean nextRaw(List<Cell> result) throws IOException {
- return next(result, true);
+ return next(result, true, null);
}
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext)
throws IOException {
- throw new IOException("next with scannerContext should not be
called in Phoenix environment");
+ return next(result, false, scannerContext);
}
@Override
public boolean nextRaw(List<Cell> result, ScannerContext
scannerContext) throws IOException {
- throw new IOException("NextRaw with scannerContext should not be
called in Phoenix environment");
+ return next(result, true, scannerContext);
}
@Override
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
index 12873c9081..ddb04a9c82 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
@@ -41,6 +41,8 @@ import
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.cache.GlobalCache;
@@ -172,19 +174,21 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
if (scanOffset != null) {
final boolean isIncompatibleClient =
ScanUtil.isIncompatibleClientForServerReturnValidRowKey(scan);
+ RegionScannerResultIterator iterator = new
RegionScannerResultIterator(scan,
+ innerScanner,
+ getMinMaxQualifiersFromScan(scan),
+ encodingScheme);
+ ScannerContext sc = iterator.getRegionScannerContext();
innerScanner = getOffsetScanner(
innerScanner,
new OffsetResultIterator(
- new RegionScannerResultIterator(
- innerScanner,
- getMinMaxQualifiersFromScan(scan),
- encodingScheme),
+ iterator,
scanOffset,
getPageSizeMsForRegionScanner(scan),
isIncompatibleClient),
scan.getAttribute(QueryConstants.LAST_SCAN) != null,
isIncompatibleClient,
- scan);
+ scan, sc);
}
boolean spoolingEnabled =
env.getConfiguration().getBoolean(
@@ -194,21 +198,22 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
env.getConfiguration()
.getLongBytes(QueryServices.SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB,
QueryServicesOptions.DEFAULT_SERVER_SPOOL_THRESHOLD_BYTES);
- final OrderedResultIterator iterator =
- deserializeFromScan(scan, innerScanner, spoolingEnabled,
thresholdBytes);
+ OrderedResultIteratorWithScannerContext ic
+ = deserializeFromScan(scan, innerScanner, spoolingEnabled,
thresholdBytes);
+ final OrderedResultIterator iterator = ic.getIterator();
if (iterator == null) {
return innerScanner;
}
// TODO:the above wrapped scanner should be used here also
- return getTopNScanner(env, innerScanner, iterator, tenantId);
+ return getTopNScanner(env, innerScanner, iterator, tenantId,
ic.getScannerContext());
}
@VisibleForTesting
- static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner
s,
- boolean spoolingEnabled,
long thresholdBytes) {
+ static OrderedResultIteratorWithScannerContext deserializeFromScan(Scan
scan, RegionScanner s,
+ boolean spoolingEnabled,
long thresholdBytes) {
byte[] topN =
scan.getAttribute(BaseScannerRegionObserverConstants.TOPN);
if (topN == null) {
- return null;
+ return new OrderedResultIteratorWithScannerContext(null, null);
}
int clientVersion = ScanUtil.getClientVersion(scan);
// Client including and after 4.15 and 5.1 are not going to serialize
thresholdBytes
@@ -239,11 +244,14 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
}
PTable.QualifierEncodingScheme encodingScheme =
EncodedColumnsUtil.getQualifierEncodingScheme(scan);
- ResultIterator inner = new RegionScannerResultIterator(s,
+ RegionScannerResultIterator inner = new
RegionScannerResultIterator(scan, s,
EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan),
encodingScheme);
- return new OrderedResultIterator(inner, orderByExpressions,
spoolingEnabled,
+ OrderedResultIterator iterator
+ = new OrderedResultIterator(inner, orderByExpressions,
spoolingEnabled,
thresholdBytes, limit >= 0 ? limit : null, null,
estimatedRowSize,
getPageSizeMsForRegionScanner(scan), scan,
s.getRegionInfo());
+ return new
OrderedResultIteratorWithScannerContext(inner.getRegionScannerContext(),
+ iterator);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
@@ -255,6 +263,24 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
}
}
+ private static class OrderedResultIteratorWithScannerContext {
+ private ScannerContext scannerContext;
+ private OrderedResultIterator iterator;
+
+ OrderedResultIteratorWithScannerContext(ScannerContext sc,
OrderedResultIterator ori) {
+ this.scannerContext = sc;
+ this.iterator = ori;
+ }
+
+ public ScannerContext getScannerContext() {
+ return scannerContext;
+ }
+
+ public OrderedResultIterator getIterator() {
+ return iterator;
+ }
+ }
+
private Expression[] deserializeArrayPositionalExpressionInfoFromScan(Scan
scan, RegionScanner s,
Set<KeyValueColumnExpression> arrayKVRefs) {
byte[] specificArrayIdx =
scan.getAttribute(BaseScannerRegionObserverConstants.SPECIFIC_ARRAY_INDEX);
@@ -295,7 +321,7 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
final OffsetResultIterator iterator,
final boolean isLastScan,
final boolean incompatibleClient,
- final Scan scan)
+ final Scan scan, final
ScannerContext sc)
throws IOException {
final Tuple firstTuple;
final Region region = getRegion();
@@ -372,6 +398,7 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
return new BaseRegionScanner(s) {
private Tuple tuple = firstTuple;
private byte[] previousResultRowKey;
+ private ScannerContext regionScannerContext = sc;
@Override
public boolean isFilterDone() {
@@ -380,6 +407,12 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
@Override
public boolean next(List<Cell> results) throws IOException {
+ return next(results, null);
+ }
+
+ @Override
+ public boolean next(List<Cell> results, ScannerContext
scannerContext)
+ throws IOException {
try {
if (isFilterDone()) {
return false;
@@ -413,6 +446,10 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
}
}
tuple = nextTuple;
+ if (regionScannerContext != null) {
+ ScannerContextUtil.updateMetrics(regionScannerContext,
scannerContext);
+ regionScannerContext = null;
+ }
return !isFilterDone();
} catch (Throwable t) {
LOGGER.error("Error while iterating Offset scanner.", t);
@@ -421,6 +458,12 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
}
}
+ @Override
+ public boolean nextRaw(List<Cell> results, ScannerContext
scannerContext)
+ throws IOException {
+ return next(results, scannerContext);
+ }
+
@Override
public void close() throws IOException {
try {
@@ -475,7 +518,9 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
* since after this everything is held in memory
*/
private RegionScanner getTopNScanner(RegionCoprocessorEnvironment env,
final RegionScanner s,
- final OrderedResultIterator iterator,
ImmutableBytesPtr tenantId) throws Throwable {
+ final OrderedResultIterator iterator,
+ ImmutableBytesPtr tenantId,
ScannerContext sc)
+ throws Throwable {
final Tuple firstTuple;
TenantCache tenantCache = GlobalCache.getTenantCache(env, tenantId);
@@ -498,6 +543,7 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
}
return new BaseRegionScanner(s) {
private Tuple tuple = firstTuple;
+ private ScannerContext regionScannerContext = sc;
@Override
public boolean isFilterDone() {
@@ -506,6 +552,12 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
@Override
public boolean next(List<Cell> results) throws IOException {
+ return next(results, null);
+ }
+
+ @Override
+ public boolean next(List<Cell> results, ScannerContext
scannerContext)
+ throws IOException {
try {
if (isFilterDone()) {
return false;
@@ -518,6 +570,10 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
}
}
tuple = iterator.next();
+ if (regionScannerContext != null) {
+ ScannerContextUtil.updateMetrics(regionScannerContext,
scannerContext);
+ regionScannerContext = null;
+ }
return !isFilterDone();
} catch (Throwable t) {
ClientUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
@@ -525,6 +581,12 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
}
}
+ @Override
+ public boolean nextRaw(List<Cell> results, ScannerContext
scannerContext)
+ throws IOException {
+ return next(results, scannerContext);
+ }
+
@Override
public void close() throws IOException {
try {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index 112bb5f63e..1b93d7f82f 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -209,8 +209,15 @@ public abstract class RegionScannerFactory {
@Override
public boolean next(List<Cell> results) throws IOException {
+ return next(results, null);
+ }
+
+ @Override
+ public boolean next(List<Cell> results, ScannerContext scannerContext)
throws IOException {
try {
- boolean next = s.next(results);
+ boolean next = (scannerContext == null)
+ ? s.next(results)
+ : s.next(results, scannerContext);
if (ScanUtil.isDummy(results)) {
return true;
}
@@ -221,10 +228,6 @@ public abstract class RegionScannerFactory {
}
}
- @Override
- public boolean next(List<Cell> result, ScannerContext scannerContext)
throws IOException {
- throw new IOException("Next with scannerContext should not be called
in Phoenix environment");
- }
@Override
public void close() throws IOException {
@@ -253,8 +256,15 @@ public abstract class RegionScannerFactory {
@Override
public boolean nextRaw(List<Cell> result) throws IOException {
+ return nextRaw(result, null);
+ }
+
+ @Override
+ public boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
throws IOException {
try {
- boolean next = s.nextRaw(result);
+ boolean next = (scannerContext == null)
+ ? s.nextRaw(result)
+ : s.nextRaw(result, scannerContext);
if (ScanUtil.isDummy(result)) {
return true;
}
@@ -309,6 +319,10 @@ public abstract class RegionScannerFactory {
return false;
}
// There is a scanattribute set to retrieve the specific array
element
+ if (scannerContext != null) {
+ ScannerContextUtil.incrementSizeProgress(scannerContext, result);
+ ScannerContextUtil.updateTimeProgress(scannerContext);
+ }
return next;
} catch (Throwable t) {
ClientUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(),
t);
@@ -369,13 +383,6 @@ public abstract class RegionScannerFactory {
return new Pair<>(tuple, new byte[0]);
}
- @Override
- public boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
- throws IOException {
- boolean res = next(result);
- ScannerContextUtil.incrementSizeProgress(scannerContext, result);
- return res;
- }
/**
* When there is a merge in progress while scanning local indexes we
might get the key values less than scan start row.
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
index b05696e758..d186ef881e 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
@@ -24,7 +24,9 @@ import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
@@ -43,12 +45,21 @@ public class RegionScannerResultIterator extends
BaseResultIterator {
private final Pair<Integer, Integer> minMaxQualifiers;
private final boolean useQualifierAsIndex;
private final QualifierEncodingScheme encodingScheme;
+ private final ScannerContext regionScannerContext;
- public RegionScannerResultIterator(RegionScanner scanner, Pair<Integer,
Integer> minMaxQualifiers, QualifierEncodingScheme encodingScheme) {
+ public RegionScannerResultIterator(Scan scan, RegionScanner scanner,
+ Pair<Integer, Integer> minMaxQualifiers,
+ QualifierEncodingScheme encodingScheme)
{
this.scanner = scanner;
this.useQualifierAsIndex =
EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers);
this.minMaxQualifiers = minMaxQualifiers;
this.encodingScheme = encodingScheme;
+ if (scan.isScanMetricsEnabled()) {
+ regionScannerContext = ScannerContext.newBuilder()
+ .setTrackMetrics(scan.isScanMetricsEnabled()).build();
+ } else {
+ regionScannerContext = null;
+ }
}
@Override
@@ -62,7 +73,12 @@ public class RegionScannerResultIterator extends
BaseResultIterator {
// Results are potentially returned even when the return value
of s.next is false
// since this is an indication of whether or not there are
more values after the
// ones returned
- boolean hasMore = scanner.nextRaw(results);
+ boolean hasMore;
+ if (regionScannerContext == null) {
+ hasMore = scanner.nextRaw(results);
+ } else {
+ hasMore = scanner.nextRaw(results, regionScannerContext);
+ }
if (!hasMore && results.isEmpty()) {
return null;
@@ -81,6 +97,9 @@ public class RegionScannerResultIterator extends
BaseResultIterator {
}
}
+ public ScannerContext getRegionScannerContext() {
+ return regionScannerContext;
+ }
@Override
public String toString() {
return "RegionScannerResultIterator [scanner=" + scanner + "]";
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
index b82ac71454..f9fb3fc9e4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
@@ -37,6 +37,7 @@ import java.util.Properties;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.types.PDate;
@@ -427,6 +428,34 @@ public class ServerPagingIT extends
ParallelStatsDisabledIT {
}
}
+ @Test
+ public void testNumberOfRPCsWithPaging() throws SQLException {
+ // insert 200 rows
+ Connection conn = DriverManager.getConnection(getUrl());
+ String tableName = generateUniqueName();
+ PhoenixStatement stmt =
conn.createStatement().unwrap(PhoenixStatement.class);
+ stmt.execute("CREATE TABLE " + tableName
+ + " (A UNSIGNED_LONG NOT NULL PRIMARY KEY, Z UNSIGNED_LONG)");
+ for (int i = 1; i <= 200; i++) {
+ String sql = String.format("UPSERT INTO %s VALUES (%d, %d)",
tableName, i, i);
+ stmt.execute(sql);
+ }
+ conn.commit();
+
+ // delete every alternate row
+ for (int i=1; i<=200; i=i+2) {
+ stmt.execute("DELETE FROM " + tableName + " WHERE A = " + i);
+ conn.commit();
+ }
+
+ ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
+ while (rs.next()) {
+ }
+ Map<String, Map<MetricType, Long>> metrics =
PhoenixRuntime.getRequestReadMetricInfo(rs);
+ long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+ Assert.assertEquals(101, numRpc);
+ }
+
private void populateTable(String tableName) throws Exception {
Connection conn = DriverManager.getConnection(getUrl());
conn.createStatement().execute("create table " + tableName +
@@ -438,4 +467,15 @@ public class ServerPagingIT extends
ParallelStatsDisabledIT {
conn.commit();
conn.close();
}
+
+ private long getMetricValue(Map<String, Map<MetricType, Long>> metrics,
MetricType type) {
+ long result = 0;
+ for (Map.Entry<String, Map<MetricType, Long>> entry :
metrics.entrySet()) {
+ Long val = entry.getValue().get(type);
+ if (val != null) {
+ result += val.longValue();
+ }
+ }
+ return result;
+ }
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java
new file mode 100644
index 0000000000..d074d4a483
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.monitoring;
+
+import static org.junit.Assert.assertEquals;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class CountRowsScannedIT extends BaseTest {
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
+ props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, "true");
+ // force many rpc calls
+ props.put(QueryServices.SCAN_CACHE_SIZE_ATTRIB, "10");
+ setUpTestDriver(new ReadOnlyProps(props));
+ }
+
+ @Test
+ public void testSinglePrimaryKey() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ String tableName = generateUniqueName();
+ PhoenixStatement stmt =
conn.createStatement().unwrap(PhoenixStatement.class);
+ stmt.execute("CREATE TABLE " + tableName
+ + " (A UNSIGNED_LONG NOT NULL PRIMARY KEY, Z UNSIGNED_LONG)");
+ for (int i = 1; i <= 100; i++) {
+ String sql = String.format("UPSERT INTO %s VALUES (%d, %d)",
tableName, i, i);
+ stmt.execute(sql);
+ }
+ conn.commit();
+
+ // both columns, but primary key 3 to 100, 98 rows
+ long count1 = countRowsScannedFromSql(stmt,
+ "SELECT A,Z FROM " + tableName + " WHERE A >= 3 AND Z >= 7");
+ assertEquals(98, count1);
+
+ // primary key, 3 to 100, 98 rows
+ long count2 = countRowsScannedFromSql(stmt,
+ "SELECT A,Z FROM " + tableName + " WHERE A >= 3");
+ assertEquals(98, count2);
+
+ // non-primary key, all rows
+ long count3 = countRowsScannedFromSql(stmt,
+ "SELECT A,Z FROM " + tableName + " WHERE Z >= 7");
+ assertEquals(100, count3);
+
+ // primary key with limit, the first row
+ long count4 = countRowsScannedFromSql(stmt,
+ "SELECT A,Z FROM " + tableName + " WHERE A >= 3 limit 1");
+ assertEquals(1, count4);
+
+ // non-primary key with limit, find the first Z >= 7, 1 to 7, 7 rows
+ long count5 = countRowsScannedFromSql(stmt,
+ "SELECT A,Z FROM " + tableName + " WHERE Z >= 7 limit 1");
+ assertEquals(7, count5);
+
+ // primary key with order by primary and limit
+ long count6 = countRowsScannedFromSql(stmt,
+ "SELECT A,Z FROM " + tableName + " WHERE A >= 3 ORDER BY A
limit 1");
+ assertEquals(1, count6);
+
+ // primary key with order by non-primary and limit
+ long count7 = countRowsScannedFromSql(stmt,
+ "SELECT A,Z FROM " + tableName + " WHERE A >= 3 ORDER BY Z
limit 1");
+ assertEquals(98, count7);
+
+ // select non-primary key with order by primary limit
+ long count8 = countRowsScannedFromSql(stmt,
+ "SELECT A,Z FROM " + tableName + " WHERE Z >= 7 ORDER BY A
limit 1");
+ assertEquals(7, count8);
+
+ // select non-primary key with order by primary limit desc
+ // scan from the last, 1 row
+ long count9 = countRowsScannedFromSql(stmt,
+ "SELECT A,Z FROM " + tableName + " WHERE Z >= 7 ORDER BY A
desc limit 1");
+ assertEquals(1, count9);
+
+ // select non-primary key with order by primary limit desc
+ // scan from the last, 1 row
+ long count10 = countRowsScannedFromSql(stmt, "SELECT A,Z FROM " +
tableName
+ + " WHERE Z >= 7 AND Z <= 60 ORDER BY A desc limit 1");
+ assertEquals(41, count10);
+
+ // select non-primary key with order by primary limit
+ long count11 = countRowsScannedFromSql(stmt,
+ "SELECT A,Z FROM " + tableName + " WHERE Z >= 7 ORDER BY Z
limit 1");
+ assertEquals(100, count11);
+
+ // skip scan
+ long count12 = countRowsScannedFromSql(stmt,
+ "SELECT A,Z FROM " + tableName + " WHERE A in (20, 45, 68,
3)");
+ assertEquals(4, count12);
+ }
+
+ @Test
+ public void testMultiPrimaryKeys() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ String tableName = generateUniqueName();
+ PhoenixStatement stmt =
conn.createStatement().unwrap(PhoenixStatement.class);
+ stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + tableName
+ + " (A UNSIGNED_LONG NOT NULL, B UNSIGNED_LONG NOT NULL, "
+ + " Z UNSIGNED_LONG, CONSTRAINT pk PRIMARY KEY (A, B))");
+ for (int i = 1; i <= 100; i++) {
+ String sql = String
+ .format("UPSERT INTO %s VALUES (%d, %d, %d)", tableName,
(i % 5) + 1, i, i);
+ stmt.execute(sql);
+ }
+ conn.commit();
+
+ // pk1 and pk2, only needed rows are scanned
+ long count1 = countRowsScannedFromSql(stmt,
+ "SELECT A,B,Z FROM " + tableName + " WHERE A >= 2 AND B >= 3");
+ assertEquals(79, count1);
+
+ // pk2, all rows
+ long count2 = countRowsScannedFromSql(stmt,
+ "SELECT A,B,Z FROM " + tableName + " WHERE B >= 3");
+ assertEquals(100, count2);
+
+ // non-pk, all rows
+ long count3 = countRowsScannedFromSql(stmt,
+ "SELECT A,B,Z FROM " + tableName + " WHERE Z >= 7");
+ assertEquals(100, count3);
+
+ // non group aggregate, pk2 only, all rows
+ long count4 = countRowsScannedFromSql(stmt, "SELECT SUM(A) FROM " +
tableName
+ + " WHERE B >= 3");
+ assertEquals(100, count4);
+
+ // pk1 and pk2, group by
+ long count5 = countRowsScannedFromSql(stmt, "SELECT B, SUM(A), SUM(Z)
FROM " + tableName
+ + " WHERE A >= 2 AND B >= 3 GROUP BY B");
+ assertEquals(79, count5);
+
+ // pk1 and pk2, group by, ordered
+ long count6 = countRowsScannedFromSql(stmt, "SELECT B, SUM(A), SUM(Z)
FROM " + tableName
+ + " WHERE A >= 2 AND B >= 3 GROUP BY B ORDER BY B DESC");
+ assertEquals(79, count6);
+ }
+
+ @Test
+ public void testQueryWithDeleteMarkers() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ String tableName = generateUniqueName();
+ PhoenixStatement stmt =
conn.createStatement().unwrap(PhoenixStatement.class);
+ stmt.execute("CREATE TABLE " + tableName
+ + " (A UNSIGNED_LONG NOT NULL PRIMARY KEY, Z UNSIGNED_LONG)");
+ for (int i = 1; i <= 100; i++) {
+ String sql = String.format("UPSERT INTO %s VALUES (%d, %d)",
tableName, i, i);
+ stmt.execute(sql);
+ }
+ conn.commit();
+ String selectQuery = "SELECT A,Z FROM " + tableName + " LIMIT 1";
+ for (int i=10; i<=100; i=i+10) {
+ stmt.execute("DELETE FROM " + tableName + " WHERE A < " + i);
+ conn.commit();
+ long count = countRowsScannedFromSql(stmt, selectQuery);
+ assertEquals(i, count);
+ }
+ }
+
+ @Test
+ public void testQueryIndex() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ String tableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ PhoenixStatement stmt =
conn.createStatement().unwrap(PhoenixStatement.class);
+ stmt.execute("CREATE TABLE " + tableName
+ + " (A UNSIGNED_LONG NOT NULL PRIMARY KEY, Z UNSIGNED_LONG)");
+ stmt.execute("CREATE INDEX " + indexName + " ON " + tableName + "(Z)
INCLUDE (A)");
+ for (int i = 1; i <= 100; i++) {
+ String sql = String.format("UPSERT INTO %s VALUES (%d, %d)",
tableName, i, i);
+ stmt.execute(sql);
+ }
+ conn.commit();
+ String selectQuery = "SELECT A FROM " + tableName + " WHERE Z > 49 AND
Z < 71";
+ long count = countRowsScannedFromSql(stmt, selectQuery);
+ assertEquals(21, count);
+ Assert.assertEquals(indexName,
stmt.getQueryPlan().getTableRef().getTable().getTableName().toString());
+ }
+
+ @Test
+ public void testJoin() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ String tableName1 = generateUniqueName();
+ String tableName2 = generateUniqueName();
+ PhoenixStatement stmt =
conn.createStatement().unwrap(PhoenixStatement.class);
+ stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + tableName1
+ + " (A UNSIGNED_LONG NOT NULL, B UNSIGNED_LONG NOT NULL, "
+ + " Z UNSIGNED_LONG, CONSTRAINT pk PRIMARY KEY (A, B))");
+ stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + tableName2
+ + " (A UNSIGNED_LONG NOT NULL, B UNSIGNED_LONG NOT NULL, "
+ + " Z UNSIGNED_LONG, CONSTRAINT pk PRIMARY KEY (A, B))");
+ // table1.B = table2.A
+ for (int i = 1; i <= 100; i++) {
+ // table1.B in [51, 150], table2.A in [1, 100]
+ String sql1 = String
+ .format("UPSERT INTO %s VALUES (%d, %d, %d)", tableName1,
i, i + 50, i);
+ stmt.execute(sql1);
+ String sql2 = String.format("UPSERT INTO %s VALUES (%d, %d, %d)",
tableName2, i, i, i);
+ stmt.execute(sql2);
+ }
+
+ conn.commit();
+
+ // table1
+ long count1 = countRowsScannedFromSql(stmt,
+ "SELECT * FROM " + tableName1 + " WHERE A >= 40");
+ assertEquals(61, count1);
+
+ // table2, all rows
+ long count2 = countRowsScannedFromSql(stmt,
+ "SELECT * FROM " + tableName2 + " WHERE B >= 20");
+ assertEquals(100, count2);
+
+ // join
+ String sqlJoin = "SELECT X.K, X.VX, Y.VY FROM ( SELECT B AS K, A AS VX
FROM " + tableName1
+ + " WHERE A >= 40) X JOIN (SELECT A AS K, B AS VY FROM " +
tableName2
+ + " WHERE B >= 20) Y ON X.K=Y.K";
+ long count3 = countRowsScannedFromSql(stmt, sqlJoin);
+ assertEquals(161, count3);
+ }
+
+ @Test
+ public void testUnionAll() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ String tableName1 = generateUniqueName();
+ String tableName2 = generateUniqueName();
+ PhoenixStatement stmt =
conn.createStatement().unwrap(PhoenixStatement.class);
+ stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + tableName1
+ + " (A UNSIGNED_LONG NOT NULL, Z UNSIGNED_LONG, CONSTRAINT pk
PRIMARY KEY (A))");
+ stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + tableName2
+ + " (B UNSIGNED_LONG NOT NULL, Z UNSIGNED_LONG, CONSTRAINT pk
PRIMARY KEY (B))");
+ for (int i = 1; i <= 100; i++) {
+ String sql1 = String.format("UPSERT INTO %s VALUES (%d, %d)",
tableName1, i, i);
+ stmt.execute(sql1);
+ String sql2 = String.format("UPSERT INTO %s VALUES (%d, %d)",
tableName2, i, i);
+ stmt.execute(sql2);
+ }
+
+ conn.commit();
+
+ // table1
+ long count1 = countRowsScannedFromSql(stmt,
+ "SELECT A, Z FROM " + tableName1 + " WHERE A >= 40");
+ assertEquals(61, count1);
+
+ // table2, all rows
+ long count2 = countRowsScannedFromSql(stmt,
+ "SELECT B, Z FROM " + tableName2 + " WHERE B >= 20");
+ assertEquals(81, count2);
+
+ // union all
+ String sqlUnionAll = "SELECT SUM(Z) FROM ( SELECT Z FROM " + tableName1
+ + " WHERE A >= 40 UNION ALL SELECT Z FROM " + tableName2 + "
WHERE B >= 20)";
+ long count3 = countRowsScannedFromSql(stmt, sqlUnionAll);
+ assertEquals(142, count3);
+
+ // union all then group by
+ String sqlUnionAllGroupBy = "SELECT K, SUM(Z) FROM ( SELECT A AS K, Z
FROM " + tableName1
+ + " WHERE A >= 40 UNION ALL SELECT B AS K, Z FROM " +
tableName2
+ + " WHERE B >= 20) GROUP BY K";
+ long count4 = countRowsScannedFromSql(stmt, sqlUnionAllGroupBy);
+ assertEquals(142, count4);
+ }
+
+ private long countRowsScannedFromSql(Statement stmt, String sql) throws
SQLException {
+ ResultSet rs = stmt.executeQuery(sql);
+ while (rs.next()) {
+ // loop to the end
+ }
+ return getRowsScanned(rs);
+ }
+
+ private long getRowsScanned(ResultSet rs) throws SQLException {
+ if (!(rs instanceof PhoenixResultSet)) {
+ return -1;
+ }
+ Map<String, Map<MetricType, Long>> metrics =
PhoenixRuntime.getRequestReadMetricInfo(rs);
+
+ long sum = 0;
+ boolean valid = false;
+ for (Map.Entry<String, Map<MetricType, Long>> entry :
metrics.entrySet()) {
+ Long val = entry.getValue().get(MetricType.COUNT_ROWS_SCANNED);
+ if (val != null) {
+ sum += val.longValue();
+ valid = true;
+ }
+ }
+ if (valid) {
+ return sum;
+ } else {
+ return -1;
+ }
+ }
+}
\ No newline at end of file
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
index 0dfc13e385..eaf4674b67 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.coprocessor.BaseRegionScanner;
@@ -173,6 +174,10 @@ public class NonTxIndexBuilderTest extends
BaseConnectionlessQueryTest {
}
return false; // indicate no more results
}
+
+ public boolean next(List<Cell> result, ScannerContext
scannerContext) throws IOException {
+ return next(result);
+ }
};
}