Repository: phoenix Updated Branches: refs/heads/3.0 c3067a754 -> 9729c171c
PHOENIX-1034 Move validate/reserve of sequences into query compile Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9729c171 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9729c171 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9729c171 Branch: refs/heads/3.0 Commit: 9729c171cbee7a06b7aa5e6be6ca01c3fbfa0da8 Parents: c3067a7 Author: James Taylor <jtay...@salesforce.com> Authored: Sun Jun 8 14:02:35 2014 -0700 Committer: James Taylor <jtay...@salesforce.com> Committed: Sun Jun 8 14:02:35 2014 -0700 ---------------------------------------------------------------------- .../DefaultParallelIteratorsRegionSplitterIT.java | 4 +++- ...SkipRangeParallelIteratorRegionSplitterIT.java | 4 +++- .../phoenix/compile/CreateIndexCompiler.java | 2 +- .../phoenix/compile/CreateTableCompiler.java | 2 +- .../org/apache/phoenix/compile/JoinCompiler.java | 2 +- .../apache/phoenix/compile/PostDDLCompiler.java | 3 ++- .../org/apache/phoenix/compile/QueryCompiler.java | 14 ++++++++------ .../apache/phoenix/compile/StatementContext.java | 18 ++++++++++++------ .../apache/phoenix/compile/UpsertCompiler.java | 2 +- .../apache/phoenix/optimize/QueryOptimizer.java | 5 +++-- .../iterate/AggregateResultScannerTest.java | 4 +++- 11 files changed, 38 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/9729c171/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java index 152b955..3ebbc8b 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.compile.SequenceManager; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.iterate.DefaultParallelIteratorRegionSplitter; import org.apache.phoenix.jdbc.PhoenixConnection; @@ -61,7 +62,8 @@ public class DefaultParallelIteratorsRegionSplitterIT extends BaseParallelIterat TableRef tableRef = getTableRef(conn, ts); PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); final List<HRegionLocation> regions = pconn.getQueryServices().getAllTableRegions(tableRef.getTable().getPhysicalName().getBytes()); - StatementContext context = new StatementContext(new PhoenixStatement(pconn), null, scan); + PhoenixStatement statement = new PhoenixStatement(pconn); + StatementContext context = new StatementContext(statement, null, scan, new SequenceManager(statement)); DefaultParallelIteratorRegionSplitter splitter = new DefaultParallelIteratorRegionSplitter(context, tableRef, HintNode.EMPTY_HINT_NODE) { @Override protected List<HRegionLocation> getAllRegions() throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/9729c171/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java index 20ce768..d4a40f0 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.compile.ColumnResolver; import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.compile.SequenceManager; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.iterate.SkipRangeParallelIteratorRegionSplitter; @@ -356,7 +357,8 @@ public class SkipRangeParallelIteratorRegionSplitterIT extends BaseClientManaged }; PhoenixConnection connection = DriverManager.getConnection(getUrl(), TEST_PROPERTIES).unwrap(PhoenixConnection.class); - StatementContext context = new StatementContext(new PhoenixStatement(connection), resolver, scan); + PhoenixStatement statement = new PhoenixStatement(connection); + StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement)); context.setScanRanges(scanRanges); SkipRangeParallelIteratorRegionSplitter splitter = SkipRangeParallelIteratorRegionSplitter.getInstance(context, tableRef, HintNode.EMPTY_HINT_NODE); List<KeyRange> keyRanges = splitter.getSplits(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/9729c171/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java index 0c6b808..bbd7154 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java @@ -44,7 +44,7 @@ public class CreateIndexCompiler { final PhoenixConnection connection = statement.getConnection(); final ColumnResolver resolver = FromCompiler.getResolverForMutation(create, connection); Scan scan = new Scan(); - final StatementContext context = new StatementContext(statement, resolver, scan); + final StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement)); ExpressionCompiler expressionCompiler = new ExpressionCompiler(context); List<ParseNode> splitNodes = create.getSplitNodes(); final byte[][] splits = new byte[splitNodes.size()][]; http://git-wip-us.apache.org/repos/asf/phoenix/blob/9729c171/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java index 693d3d6..7794416 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java @@ -79,7 +79,7 @@ public class CreateTableCompiler { PTable parentToBe = null; ViewType viewTypeToBe = null; Scan scan = new Scan(); - final StatementContext context = new StatementContext(statement, resolver, scan); + final StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement)); // TODO: support any statement for a VIEW instead of just a WHERE clause ParseNode whereNode = create.getWhereClause(); String viewStatementToBe = null; http://git-wip-us.apache.org/repos/asf/phoenix/blob/9729c171/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java index 077f1ac..50acaf1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java @@ -1115,7 +1115,7 @@ public class JoinCompiler { if (groupByTableRef != null || orderByTableRef != null) { QueryCompiler compiler = new QueryCompiler(statement, select, resolver); List<Object> binds = statement.getParameters(); - StatementContext ctx = new StatementContext(statement, resolver, new Scan()); + StatementContext ctx = new StatementContext(statement, resolver, new Scan(), new SequenceManager(statement)); QueryPlan plan = compiler.compileJoinQuery(ctx, binds, join, false); TableRef table = plan.getTableRef(); if (groupByTableRef != null && !groupByTableRef.equals(table)) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/9729c171/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java index ccf805f..294942f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java @@ -134,7 +134,8 @@ public class PostDDLCompiler { return new ColumnRef(tableRef, column.getPosition()); } }; - StatementContext context = new StatementContext(new PhoenixStatement(connection), resolver, scan); + PhoenixStatement statement = new PhoenixStatement(connection); + StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement)); ScanUtil.setTimeRange(scan, timestamp); if (emptyCF != null) { scan.setAttribute(BaseScannerRegionObserver.EMPTY_CF, emptyCF); http://git-wip-us.apache.org/repos/asf/phoenix/blob/9729c171/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java index 1f39ad9..20c0acd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java @@ -82,18 +82,20 @@ public class QueryCompiler { private final SelectStatement select; private final List<? extends PDatum> targetColumns; private final ParallelIteratorFactory parallelIteratorFactory; + private final SequenceManager sequenceManager; public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver) throws SQLException { - this(statement, select, resolver, Collections.<PDatum>emptyList(), null); + this(statement, select, resolver, Collections.<PDatum>emptyList(), null, new SequenceManager(statement)); } - public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory) throws SQLException { + public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager sequenceManager) throws SQLException { this.statement = statement; this.select = select; this.resolver = resolver; this.scan = new Scan(); this.targetColumns = targetColumns; this.parallelIteratorFactory = parallelIteratorFactory; + this.sequenceManager = sequenceManager; if (statement.getConnection().getQueryServices().getLowestClusterHBaseVersion() >= PhoenixDatabaseMetaData.ESSENTIAL_FAMILY_VERSION_THRESHOLD) { this.scan.setAttribute(LOAD_COLUMN_FAMILIES_ON_DEMAND_ATTR, QueryConstants.TRUE); } @@ -117,12 +119,12 @@ public class QueryCompiler { public QueryPlan compile() throws SQLException{ SelectStatement select = this.select; List<Object> binds = statement.getParameters(); - StatementContext context = new StatementContext(statement, resolver, scan); + StatementContext context = new StatementContext(statement, resolver, scan, sequenceManager); if (select.isJoin()) { select = JoinCompiler.optimize(statement, select, resolver); if (this.select != select) { ColumnResolver resolver = FromCompiler.getResolverForQuery(select, statement.getConnection()); - context = new StatementContext(statement, resolver, scan); + context = new StatementContext(statement, resolver, scan, sequenceManager); } JoinTable joinTable = JoinCompiler.compile(statement, select, context.getResolver()); return compileJoinQuery(context, binds, joinTable, false); @@ -188,7 +190,7 @@ public class QueryCompiler { for (int i = 0; i < count; i++) { JoinSpec joinSpec = joinSpecs.get(i); Scan subScan = ScanUtil.newScan(originalScan); - StatementContext subContext = new StatementContext(statement, context.getResolver(), subScan); + StatementContext subContext = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement)); joinPlans[i] = compileJoinQuery(subContext, binds, joinSpec.getJoinTable(), true); ColumnResolver resolver = subContext.getResolver(); clientProjectors[i] = subContext.getClientTupleProjector(); @@ -242,7 +244,7 @@ public class QueryCompiler { Table rhsTable = rhsJoinTable.getTable(); JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters(); Scan subScan = ScanUtil.newScan(originalScan); - StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), subScan); + StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement)); QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true); ColumnResolver lhsResolver = lhsCtx.getResolver(); TupleProjector clientProjector = lhsCtx.getClientTupleProjector(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/9729c171/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java index 27f9e82..4c907d6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java @@ -28,9 +28,15 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.join.TupleProjector; -import org.apache.phoenix.query.*; -import org.apache.phoenix.schema.*; -import org.apache.phoenix.util.*; +import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.MetaDataClient; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.util.DateUtil; +import org.apache.phoenix.util.NumberUtil; +import org.apache.phoenix.util.ScanUtil; /** @@ -64,13 +70,14 @@ public class StatementContext { private TupleProjector clientTupleProjector; public StatementContext(PhoenixStatement statement) { - this(statement, FromCompiler.EMPTY_TABLE_RESOLVER, new Scan()); + this(statement, FromCompiler.EMPTY_TABLE_RESOLVER, new Scan(), new SequenceManager(statement)); } - public StatementContext(PhoenixStatement statement, ColumnResolver resolver, Scan scan) { + public StatementContext(PhoenixStatement statement, ColumnResolver resolver, Scan scan, SequenceManager seqManager) { this.statement = statement; this.resolver = resolver; this.scan = scan; + this.sequences = seqManager; this.binds = new BindManager(statement.getParameters()); this.aggregates = new AggregationManager(); this.expressions = new ExpressionManager(); @@ -81,7 +88,6 @@ public class StatementContext { this.numberFormat = connection.getQueryServices().getProps().get(QueryServices.NUMBER_FORMAT_ATTRIB, NumberUtil.DEFAULT_NUMBER_FORMAT); this.tempPtr = new ImmutableBytesWritable(); this.currentTable = resolver != null && !resolver.getTables().isEmpty() ? resolver.getTables().get(0) : null; - this.sequences = new SequenceManager(statement); this.whereConditionColumns = new ArrayList<Pair<byte[],byte[]>>(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9729c171/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 7295740..60e7bb8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -642,7 +642,7 @@ public class UpsertCompiler { final int nodeIndexOffset = nodeIndex; // Allocate array based on size of all columns in table, // since some values may not be set (if they're nullable). - final StatementContext context = new StatementContext(statement, resolver, new Scan()); + final StatementContext context = new StatementContext(statement, resolver, new Scan(), new SequenceManager(statement)); UpsertValuesCompiler expressionBuilder = new UpsertValuesCompiler(context); final List<Expression> constantExpressions = Lists.newArrayListWithExpectedSize(valueNodes.size()); // First build all the expressions, as with sequences we want to collect them all first http://git-wip-us.apache.org/repos/asf/phoenix/blob/9729c171/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java index 0a01152..f0c43b7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java @@ -29,6 +29,7 @@ import org.apache.phoenix.compile.FromCompiler; import org.apache.phoenix.compile.IndexStatementRewriter; import org.apache.phoenix.compile.QueryCompiler; import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.SequenceManager; import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.HintNode; @@ -70,7 +71,7 @@ public class QueryOptimizer { } public QueryPlan optimize(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory) throws SQLException { - QueryCompiler compiler = new QueryCompiler(statement, select, resolver, targetColumns, parallelIteratorFactory); + QueryCompiler compiler = new QueryCompiler(statement, select, resolver, targetColumns, parallelIteratorFactory, new SequenceManager(statement)); QueryPlan dataPlan = compiler.compile(); return optimize(dataPlan, statement, targetColumns, parallelIteratorFactory); } @@ -193,7 +194,7 @@ public class QueryOptimizer { ColumnResolver resolver = FromCompiler.getResolverForQuery(indexSelect, statement.getConnection()); // Check index state of now potentially updated index table to make sure it's active if (PIndexState.ACTIVE.equals(resolver.getTables().get(0).getTable().getIndexState())) { - QueryCompiler compiler = new QueryCompiler(statement, indexSelect, resolver, targetColumns, parallelIteratorFactory); + QueryCompiler compiler = new QueryCompiler(statement, indexSelect, resolver, targetColumns, parallelIteratorFactory, dataPlan.getContext().getSequenceManager()); QueryPlan plan = compiler.compile(); // Checking number of columns handles the wildcard cases correctly, as in that case the index // must contain all columns from the data table to be able to be used. http://git-wip-us.apache.org/repos/asf/phoenix/blob/9729c171/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java index 263ccd8..2edd588 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.compile.AggregationManager; +import org.apache.phoenix.compile.SequenceManager; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; @@ -88,7 +89,8 @@ public class AggregateResultScannerTest extends BaseConnectionlessQueryTest { }; PhoenixConnection pconn = DriverManager.getConnection(getUrl(), TEST_PROPERTIES).unwrap(PhoenixConnection.class); - StatementContext context = new StatementContext(new PhoenixStatement(pconn), null, new Scan()); + PhoenixStatement statement = new PhoenixStatement(pconn); + StatementContext context = new StatementContext(statement, null, new Scan(), new SequenceManager(statement)); AggregationManager aggregationManager = context.getAggregationManager(); SumAggregateFunction func = new SumAggregateFunction(Arrays.<Expression>asList(new KeyValueColumnExpression(new PLongColumn() { @Override