PHOENIX-2697 Provide a SERIAL hint.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/0b1a180f Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/0b1a180f Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/0b1a180f Branch: refs/heads/master Commit: 0b1a180f1d9acc23cf58ecfc84a67aac110160cd Parents: 61fa462 Author: Lars Hofhansl <la...@apache.org> Authored: Sat Feb 20 19:36:35 2016 -0800 Committer: Lars Hofhansl <la...@apache.org> Committed: Sat Feb 20 19:36:35 2016 -0800 ---------------------------------------------------------------------- .../apache/phoenix/execute/AggregatePlan.java | 16 ++++++++++----- .../org/apache/phoenix/execute/ScanPlan.java | 14 ++++++++----- .../apache/phoenix/iterate/ExplainTable.java | 2 +- .../apache/phoenix/iterate/SerialIterators.java | 3 ++- .../java/org/apache/phoenix/parse/HintNode.java | 4 ++++ .../compile/StatementHintsCompilationTest.java | 21 +++++++++++++++----- 6 files changed, 43 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b1a180f/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index 3de4e68..73a995c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@ -48,10 +48,13 @@ import org.apache.phoenix.iterate.ParallelIterators; import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.PeekingResultIterator; import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.iterate.ResultIterators; import org.apache.phoenix.iterate.SequenceResultIterator; +import org.apache.phoenix.iterate.SerialIterators; import org.apache.phoenix.iterate.SpoolingResultIterator; import org.apache.phoenix.iterate.UngroupedAggregatingResultIterator; import org.apache.phoenix.parse.FilterableStatement; +import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; @@ -194,16 +197,19 @@ public class AggregatePlan extends BaseQueryPlan { context.getScan().setAttribute(BaseScannerRegionObserver.GROUP_BY_LIMIT, PInteger.INSTANCE.toBytes(limit)); } } - ParallelIterators parallelIterators = new ParallelIterators(this, null, wrapParallelIteratorFactory()); - splits = parallelIterators.getSplits(); - scans = parallelIterators.getScans(); + ResultIterators iterators = statement.getHint().hasHint(HintNode.Hint.SERIAL) ? + new SerialIterators(this, null, wrapParallelIteratorFactory(), scanGrouper) : + new ParallelIterators(this, null, wrapParallelIteratorFactory()); + + splits = iterators.getSplits(); + scans = iterators.getScans(); AggregatingResultIterator aggResultIterator; // No need to merge sort for ungrouped aggregation if (groupBy.isEmpty()) { - aggResultIterator = new UngroupedAggregatingResultIterator(new ConcatResultIterator(parallelIterators), aggregators); + aggResultIterator = new UngroupedAggregatingResultIterator(new ConcatResultIterator(iterators), aggregators); } else { - aggResultIterator = new GroupedAggregatingResultIterator(new MergeSortRowKeyResultIterator(parallelIterators), aggregators); + aggResultIterator = new GroupedAggregatingResultIterator(new MergeSortRowKeyResultIterator(iterators), aggregators); } if (having != null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b1a180f/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index f4c570c..d51e6c8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -46,6 +46,7 @@ import org.apache.phoenix.iterate.SequenceResultIterator; import org.apache.phoenix.iterate.SerialIterators; import org.apache.phoenix.iterate.SpoolingResultIterator; import org.apache.phoenix.parse.FilterableStatement; +import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryConstants; @@ -85,7 +86,7 @@ public class ScanPlan extends BaseQueryPlan { private ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, Expression dynamicFilter) throws SQLException { super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory != null ? parallelIteratorFactory : - buildResultIteratorFactory(context, table, orderBy, limit, allowPageFilter), dynamicFilter); + buildResultIteratorFactory(context, statement, table, orderBy, limit, allowPageFilter), dynamicFilter); this.allowPageFilter = allowPageFilter; if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( @@ -94,8 +95,11 @@ public class ScanPlan extends BaseQueryPlan { } } - private static boolean isSerial(StatementContext context, + private static boolean isSerial(StatementContext context, FilterableStatement statement, TableRef tableRef, OrderBy orderBy, Integer limit, boolean allowPageFilter) throws SQLException { + if (statement.getHint().hasHint(HintNode.Hint.SERIAL)) { + return true; + } Scan scan = context.getScan(); /* * If a limit is provided and we have no filter, run the scan serially when we estimate that @@ -137,10 +141,10 @@ public class ScanPlan extends BaseQueryPlan { return isSerial; } - private static ParallelIteratorFactory buildResultIteratorFactory(StatementContext context, + private static ParallelIteratorFactory buildResultIteratorFactory(StatementContext context, FilterableStatement statement, TableRef table, OrderBy orderBy, Integer limit, boolean allowPageFilter) throws SQLException { - if (isSerial(context, table, orderBy, limit, allowPageFilter) + if (isSerial(context, statement, table, orderBy, limit, allowPageFilter) || ScanUtil.isRoundRobinPossible(orderBy, context) || ScanUtil.isPacingScannersPossible(context)) { return ParallelIteratorFactory.NOOP_FACTORY; @@ -189,7 +193,7 @@ public class ScanPlan extends BaseQueryPlan { * limit is provided, run query serially. */ boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty(); - boolean isSerial = isSerial(context, tableRef, orderBy, limit, allowPageFilter); + boolean isSerial = isSerial(context, statement, tableRef, orderBy, limit, allowPageFilter); Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit; ResultIterators iterators; if (isSerial) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b1a180f/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java index 4a71483..b319914 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java @@ -110,7 +110,7 @@ public abstract class ExplainTable { buf.append("TIMELINE-CONSISTENCY "); } if (hint.hasHint(Hint.SMALL)) { - buf.append("SMALL "); + buf.append(Hint.SMALL).append(" "); } if (OrderBy.REV_ROW_KEY_ORDER_BY.equals(orderBy)) { buf.append("REVERSE "); http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b1a180f/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java index a221ba2..60b9f44 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java @@ -32,6 +32,7 @@ import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.job.JobManager.JobCallable; import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder; +import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.ScanUtil; @@ -54,7 +55,7 @@ public class SerialIterators extends BaseResultIterators { public SerialIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper) throws SQLException { super(plan, perScanLimit, scanGrouper); - Preconditions.checkArgument(perScanLimit != null); // must be a limit specified + Preconditions.checkArgument(perScanLimit != null || plan.getStatement().getHint().hasHint(HintNode.Hint.SERIAL)); // must be a limit specified or a SERIAL hint this.iteratorFactory = iteratorFactory; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b1a180f/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java index ce20208..6d8451b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java @@ -100,6 +100,10 @@ public class HintNode { * Saves an RPC call on the scan. See Scan.setSmall(true) in HBase documentation. */ SMALL, + /** + * Enforces a serial scan. + */ + SERIAL, }; private final Map<Hint,String> hints; http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b1a180f/phoenix-core/src/test/java/org/apache/phoenix/compile/StatementHintsCompilationTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/StatementHintsCompilationTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/StatementHintsCompilationTest.java index 7f8adfa..9adf414 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/compile/StatementHintsCompilationTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/StatementHintsCompilationTest.java @@ -61,17 +61,17 @@ public class StatementHintsCompilationTest extends BaseConnectionlessQueryTest { return filter instanceof SkipScanFilter; } - private static StatementContext compileStatement(String query) throws SQLException { + private static QueryPlan compileStatement(String query) throws SQLException { return compileStatement(query, Collections.emptyList(), null); } - private static StatementContext compileStatement(String query, List<Object> binds, Integer limit) throws SQLException { + private static QueryPlan compileStatement(String query, List<Object> binds, Integer limit) throws SQLException { PhoenixConnection pconn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class); PhoenixPreparedStatement pstmt = new PhoenixPreparedStatement(pconn, query); TestUtil.bindParams(pstmt, binds); QueryPlan plan = pstmt.compileQuery(); assertEquals(limit, plan.getLimit()); - return plan.getContext(); + return plan; } @Test @@ -80,7 +80,7 @@ public class StatementHintsCompilationTest extends BaseConnectionlessQueryTest { // A where clause without the first column usually compiles into a range scan. String query = "SELECT /*+ SKIP_SCAN */ * FROM atable WHERE entity_id='" + id + "'"; - Scan scan = compileStatement(query).getScan(); + Scan scan = compileStatement(query).getContext().getScan(); assertTrue("The first filter should be SkipScanFilter.", usingSkipScan(scan)); } @@ -88,7 +88,7 @@ public class StatementHintsCompilationTest extends BaseConnectionlessQueryTest { public void testSelectForceRangeScan() throws Exception { String query = "SELECT /*+ RANGE_SCAN */ * FROM atable WHERE organization_id in (" + "'000000000000001', '000000000000002', '000000000000003', '000000000000004')"; - Scan scan = compileStatement(query).getScan(); + Scan scan = compileStatement(query).getContext().getScan(); // Verify that it is not using SkipScanFilter. assertFalse("The first filter should not be SkipScanFilter.", usingSkipScan(scan)); } @@ -103,4 +103,15 @@ public class StatementHintsCompilationTest extends BaseConnectionlessQueryTest { " SERVER TOP 100 ROWS SORTED BY [ORGANIZATION_ID, PARENT_ID, CREATED_DATE DESC, ENTITY_HISTORY_ID]\n" + "CLIENT MERGE SORT",QueryUtil.getExplainPlan(rs)); } + + @Test + public void testSerialHint() throws Exception { + // test ScanPlan + String query = "SELECT /*+ SERIAL */ COUNT(*) FROM atable"; + assertTrue("Expected a SERIAL query", compileStatement(query).getExplainPlan().getPlanSteps().get(0).contains("SERIAL")); + + // test AggregatePlan + query = "SELECT /*+ SERIAL */ * FROM atable"; + assertTrue("Expected a SERIAL query", compileStatement(query).getExplainPlan().getPlanSteps().get(0).contains("SERIAL")); + } }