Fix sync errors
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c8c33a91 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c8c33a91 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c8c33a91 Branch: refs/heads/calcite Commit: c8c33a918e4ca66cbcc2ffa3e15107f53a60f58b Parents: c3613fd 13ae3f3 Author: maryannxue <maryann....@gmail.com> Authored: Fri Oct 28 14:26:27 2016 -0700 Committer: maryannxue <maryann....@gmail.com> Committed: Fri Oct 28 14:26:27 2016 -0700 ---------------------------------------------------------------------- bin/queryserver.py | 2 +- .../src/build/components/all-common-jars.xml | 10 +- phoenix-client/pom.xml | 6 + .../phoenix/end2end/AbsFunctionEnd2EndIT.java | 6 +- .../phoenix/end2end/AggregateQueryIT.java | 2 +- .../phoenix/end2end/ArithmeticQueryIT.java | 11 +- .../phoenix/end2end/ArrayFillFunctionIT.java | 6 +- .../phoenix/end2end/DefaultColumnValueIT.java | 1070 ++++++++++++++++++ .../phoenix/end2end/ExecuteStatementsIT.java | 12 +- .../phoenix/end2end/FirstValueFunctionIT.java | 60 +- .../apache/phoenix/end2end/GroupByCaseIT.java | 34 +- .../org/apache/phoenix/end2end/HashJoinIT.java | 32 +- .../apache/phoenix/end2end/IndexExtendedIT.java | 7 +- .../phoenix/end2end/LastValueFunctionIT.java | 148 +-- .../phoenix/end2end/NthValueFunctionIT.java | 48 +- .../phoenix/end2end/OnDuplicateKeyIT.java | 523 +++++++++ .../phoenix/end2end/ProductMetricsIT.java | 68 +- .../org/apache/phoenix/end2end/QueryMoreIT.java | 107 +- .../phoenix/end2end/QueryWithOffsetIT.java | 8 + .../phoenix/end2end/RoundFloorCeilFuncIT.java | 17 +- .../phoenix/end2end/SignFunctionEnd2EndIT.java | 6 +- .../apache/phoenix/end2end/SortMergeJoinIT.java | 40 +- .../org/apache/phoenix/end2end/SortOrderIT.java | 6 +- .../phoenix/end2end/StatsCollectorIT.java | 53 +- .../end2end/SubqueryUsingSortMergeJoinIT.java | 2 +- .../org/apache/phoenix/end2end/UnionAllIT.java | 6 +- .../apache/phoenix/end2end/UpsertSelectIT.java | 58 +- .../apache/phoenix/end2end/UpsertValuesIT.java | 10 +- .../org/apache/phoenix/end2end/UseSchemaIT.java | 39 + .../phoenix/end2end/VariableLengthPKIT.java | 46 +- .../index/ChildViewsUseParentViewIndexIT.java | 17 + .../phoenix/end2end/index/IndexTestUtil.java | 6 +- .../phoenix/end2end/index/LocalIndexIT.java | 21 + .../end2end/index/MutableIndexFailureIT.java | 13 +- .../phoenix/end2end/index/MutableIndexIT.java | 5 +- .../phoenix/end2end/index/ViewIndexIT.java | 260 ++++- .../phoenix/iterate/MockResultIterator.java | 2 +- .../org/apache/phoenix/tx/TransactionIT.java | 15 + phoenix-core/src/main/antlr3/PhoenixSQL.g | 32 +- .../apache/phoenix/calcite/TableMapping.java | 8 +- .../calcite/rel/PhoenixRelImplementor.java | 3 +- .../calcite/rel/PhoenixRelImplementorImpl.java | 3 +- .../phoenix/calcite/rel/PhoenixTableModify.java | 15 +- .../phoenix/compile/CreateTableCompiler.java | 28 +- .../apache/phoenix/compile/DeleteCompiler.java | 14 +- .../apache/phoenix/compile/PostDDLCompiler.java | 10 +- .../apache/phoenix/compile/SequenceManager.java | 3 + .../apache/phoenix/compile/UpsertCompiler.java | 165 ++- .../apache/phoenix/compile/WhereOptimizer.java | 27 +- .../coprocessor/BaseScannerRegionObserver.java | 1 + .../coprocessor/MetaDataEndpointImpl.java | 17 +- .../phoenix/coprocessor/MetaDataProtocol.java | 8 +- .../coprocessor/MetaDataRegionObserver.java | 101 +- .../coprocessor/SequenceRegionObserver.java | 2 +- .../UngroupedAggregateRegionObserver.java | 514 +++++---- .../exception/DataExceedsCapacityException.java | 14 +- .../phoenix/exception/SQLExceptionCode.java | 9 +- .../phoenix/exception/SQLExceptionInfo.java | 9 +- .../apache/phoenix/execute/MutationState.java | 32 +- .../org/apache/phoenix/execute/ScanPlan.java | 6 +- .../apache/phoenix/execute/TupleProjector.java | 3 +- .../org/apache/phoenix/execute/UnionPlan.java | 3 +- .../phoenix/expression/ExpressionType.java | 4 +- .../function/ArrayConcatFunction.java | 5 +- .../function/ArrayModifierFunction.java | 8 +- .../function/DefaultValueExpression.java | 91 ++ .../phoenix/filter/ColumnProjectionFilter.java | 29 +- .../org/apache/phoenix/hbase/index/Indexer.java | 92 +- .../hbase/index/builder/BaseIndexBuilder.java | 14 +- .../hbase/index/builder/IndexBuildManager.java | 10 + .../hbase/index/builder/IndexBuilder.java | 29 +- .../phoenix/hbase/index/covered/IndexCodec.java | 1 - .../hbase/index/util/IndexManagementUtil.java | 13 +- .../hbase/index/util/KeyValueBuilder.java | 15 +- .../apache/phoenix/index/IndexMaintainer.java | 2 +- .../phoenix/index/PhoenixIndexBuilder.java | 319 ++++++ .../phoenix/index/PhoenixIndexMetaData.java | 7 +- .../phoenix/iterate/BaseResultIterators.java | 89 +- .../iterate/MergeSortTopNResultIterator.java | 3 + .../phoenix/jdbc/PhoenixDatabaseMetaData.java | 6 +- .../apache/phoenix/jdbc/PhoenixStatement.java | 11 +- .../org/apache/phoenix/parse/ColumnDef.java | 79 ++ .../phoenix/parse/CreateSchemaStatement.java | 4 +- .../phoenix/parse/CreateTableStatement.java | 13 + .../apache/phoenix/parse/FunctionParseNode.java | 73 +- .../apache/phoenix/parse/ParseNodeFactory.java | 21 +- .../apache/phoenix/parse/UpsertStatement.java | 10 +- .../phoenix/parse/UseSchemaStatement.java | 4 +- .../phoenix/query/ConnectionQueryServices.java | 20 +- .../query/ConnectionQueryServicesImpl.java | 122 +- .../query/ConnectionlessQueryServicesImpl.java | 28 +- .../query/DelegateConnectionQueryServices.java | 12 +- .../apache/phoenix/query/GuidePostsCache.java | 231 ++++ .../apache/phoenix/query/QueryConstants.java | 3 +- .../apache/phoenix/query/TableStatsCache.java | 192 ---- .../org/apache/phoenix/schema/ColumnRef.java | 42 +- .../apache/phoenix/schema/DelegateColumn.java | 10 + .../phoenix/schema/DelegateSQLException.java | 62 + .../apache/phoenix/schema/DelegateTable.java | 18 +- .../apache/phoenix/schema/MetaDataClient.java | 137 +-- .../org/apache/phoenix/schema/PColumnImpl.java | 12 +- .../apache/phoenix/schema/PMetaDataImpl.java | 2 +- .../java/org/apache/phoenix/schema/PRow.java | 11 +- .../java/org/apache/phoenix/schema/PTable.java | 6 +- .../org/apache/phoenix/schema/PTableImpl.java | 144 ++- .../apache/phoenix/schema/TableProperty.java | 7 + .../stats/DefaultStatisticsCollector.java | 145 ++- .../phoenix/schema/stats/GuidePostsInfo.java | 77 +- .../schema/stats/GuidePostsInfoBuilder.java | 4 + .../phoenix/schema/stats/GuidePostsKey.java | 84 ++ .../schema/stats/NoOpStatisticsCollector.java | 9 +- .../phoenix/schema/stats/PTableStats.java | 57 - .../phoenix/schema/stats/PTableStatsImpl.java | 115 -- .../schema/stats/StatisticsCollector.java | 12 +- .../stats/StatisticsCollectorFactory.java | 19 +- .../phoenix/schema/stats/StatisticsScanner.java | 3 +- .../phoenix/schema/stats/StatisticsUtil.java | 135 +-- .../phoenix/schema/stats/StatisticsWriter.java | 6 +- .../phoenix/schema/types/PArrayDataType.java | 11 +- .../apache/phoenix/schema/types/PBinary.java | 334 +++--- .../phoenix/schema/types/PBinaryBase.java | 17 + .../org/apache/phoenix/schema/types/PChar.java | 15 +- .../apache/phoenix/schema/types/PDataType.java | 5 +- .../apache/phoenix/schema/types/PDecimal.java | 669 +++++------ .../apache/phoenix/schema/types/PVarbinary.java | 248 ++-- .../apache/phoenix/schema/types/PVarchar.java | 268 ++--- .../org/apache/phoenix/util/ExpressionUtil.java | 1 - .../java/org/apache/phoenix/util/IndexUtil.java | 2 +- .../org/apache/phoenix/util/MetaDataUtil.java | 44 +- .../org/apache/phoenix/util/PhoenixRuntime.java | 34 + .../java/org/apache/phoenix/util/ScanUtil.java | 4 + .../org/apache/phoenix/util/SchemaUtil.java | 11 +- .../org/apache/phoenix/util/UpgradeUtil.java | 2 +- .../phoenix/compile/JoinQueryCompilerTest.java | 2 +- .../phoenix/compile/QueryCompilerTest.java | 352 ++++++ .../phoenix/compile/QueryOptimizerTest.java | 15 +- .../compile/StatementHintsCompilationTest.java | 2 +- .../phoenix/compile/WhereCompilerTest.java | 2 +- .../phoenix/compile/WhereOptimizerTest.java | 114 ++ .../phoenix/execute/CorrelatePlanTest.java | 6 +- .../phoenix/filter/SkipScanBigFilterTest.java | 25 +- .../jdbc/PhoenixResultSetMetadataTest.java | 4 +- .../java/org/apache/phoenix/query/BaseTest.java | 2 +- .../PhoenixStatsCacheRemovalListenerTest.java | 2 +- .../org/apache/phoenix/query/QueryPlanTest.java | 23 +- .../org/apache/phoenix/schema/MutationTest.java | 54 + .../apache/phoenix/util/PhoenixRuntimeTest.java | 44 + .../java/org/apache/phoenix/util/TestUtil.java | 35 +- .../java/org/apache/phoenix/pherf/Pherf.java | 6 + .../phoenix/pherf/workload/WriteWorkload.java | 49 +- phoenix-pig/pom.xml | 263 +++++ .../phoenix/end2end/QueryServerBasicsIT.java | 9 +- .../phoenix/end2end/QueryServerThread.java | 10 +- .../apache/phoenix/queryserver/server/Main.java | 333 ------ .../phoenix/queryserver/server/QueryServer.java | 340 ++++++ .../server/PhoenixDoAsCallbackTest.java | 2 +- pom.xml | 2 +- 157 files changed, 7088 insertions(+), 2759 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java index a00facf,0000000..216920f mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java @@@ -1,366 -1,0 +1,370 @@@ +package org.apache.phoenix.calcite; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import org.apache.calcite.plan.RelOptUtil.InputFinder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.WritableUtils; +import org.apache.phoenix.compile.ColumnProjector; +import org.apache.phoenix.compile.ExpressionProjector; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.TupleProjectionCompiler; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.expression.ColumnExpression; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.LiteralExpression; +import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.schema.ColumnRef; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PColumnFamily; +import org.apache.phoenix.schema.PName; +import org.apache.phoenix.schema.PNameFactory; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableImpl; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.ProjectedColumn; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; +import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.util.SchemaUtil; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public class TableMapping { + private final TableRef tableRef; + private final TableRef dataTableRef; + private final List<PColumn> mappedColumns; + private final int extendedColumnsOffset; + private final TableRef extendedTableRef; + + public TableMapping(PTable table) { + this.tableRef = new TableRef(table); + this.dataTableRef = null; + this.mappedColumns = getMappedColumns(table); + this.extendedColumnsOffset = mappedColumns.size(); + this.extendedTableRef = null; + } + + public TableMapping(TableRef tableRef, TableRef dataTableRef, boolean extend) throws SQLException { + this.tableRef = tableRef; + this.dataTableRef = dataTableRef; + if (!extend) { + this.mappedColumns = getMappedColumns(tableRef.getTable()); + this.extendedColumnsOffset = mappedColumns.size(); + this.extendedTableRef = null; + } else { + this.mappedColumns = Lists.newArrayList(); + this.mappedColumns.addAll(getMappedColumns(tableRef.getTable())); + this.extendedColumnsOffset = mappedColumns.size(); + final PTable dataTable = dataTableRef.getTable(); + final List<PColumn> projectedColumns = getDataTableMappedColumns(dataTableRef, mappedColumns); + this.mappedColumns.addAll(projectedColumns); + PTable extendedTable = PTableImpl.makePTable(dataTable.getTenantId(), + TupleProjectionCompiler.PROJECTED_TABLE_SCHEMA, dataTable.getName(), + PTableType.PROJECTED, null, dataTable.getTimeStamp(), + dataTable.getSequenceNumber(), dataTable.getPKName(), null, + projectedColumns, null, null, Collections.<PTable>emptyList(), + dataTable.isImmutableRows(), Collections.<PName>emptyList(), null, null, + dataTable.isWALDisabled(), false, dataTable.getStoreNulls(), + dataTable.getViewType(), null, null, dataTable.rowKeyOrderOptimizable(), + dataTable.isTransactional(), dataTable.getUpdateCacheFrequency(), + dataTable.getIndexDisableTimestamp(), dataTable.isNamespaceMapped(), + dataTable.getAutoPartitionSeqName(), dataTable.isAppendOnlySchema()); + this.extendedTableRef = new TableRef(extendedTable); + } + } + + public TableRef getTableRef() { + return tableRef; + } + + public PTable getPTable() { + return tableRef.getTable(); + } + + public TableRef getDataTableRef() { + return dataTableRef; + } + + public List<PColumn> getMappedColumns() { + return mappedColumns; + } + + public boolean hasExtendedColumns() { + return extendedTableRef != null; + } + - public ColumnExpression newColumnExpression(int index) { ++ public Expression newColumnExpression(int index) { + ColumnRef colRef = new ColumnRef( + index < extendedColumnsOffset ? tableRef : extendedTableRef, + this.mappedColumns.get(index).getPosition()); - return colRef.newColumnExpression(); ++ try { ++ return colRef.newColumnExpression(); ++ } catch (SQLException e) { ++ throw new RuntimeException(e); ++ } + } + + public ImmutableBitSet getDefaultExtendedColumnRef() { + return ImmutableBitSet.range(extendedColumnsOffset, mappedColumns.size()); + } + + public ImmutableBitSet getExtendedColumnRef(List<RexNode> exprs) { + if (!hasExtendedColumns()) { + return ImmutableBitSet.of(); + } + + ImmutableBitSet.Builder builder = ImmutableBitSet.builder(); + for (RexNode expr : exprs) { + builder.addAll(InputFinder.analyze(expr).inputBitSet.build()); + } + for (int i = 0; i < extendedColumnsOffset; i++) { + builder.clear(i); + } + return builder.build(); + } + + public Pair<Integer, Integer> getExtendedColumnReferenceCount(ImmutableBitSet columnRef) { + Set<String> cf = Sets.newHashSet(); + int columnCount = 0; + for (int i = extendedColumnsOffset; i < mappedColumns.size(); i++) { + if (columnRef.get(i)) { + PColumn dataColumn = ((ProjectedColumn) mappedColumns.get(i)) + .getSourceColumnRef().getColumn(); + cf.add(dataColumn.getFamilyName().getString()); + columnCount++; + } + } + return new Pair<Integer, Integer>(cf.size(), columnCount); + } + + public PTable createProjectedTable(boolean retainPKColumns) { + List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList(); + List<PColumn> columns = retainPKColumns ? + tableRef.getTable().getColumns() : mappedColumns.subList(0, extendedColumnsOffset); + for (PColumn column : columns) { + sourceColumnRefs.add(new ColumnRef(tableRef, column.getPosition())); + } + if (extendedColumnsOffset < mappedColumns.size()) { + for (PColumn column : mappedColumns.subList(extendedColumnsOffset, mappedColumns.size())) { + sourceColumnRefs.add(new ColumnRef(extendedTableRef, column.getPosition())); + } + } + + try { + return TupleProjectionCompiler.createProjectedTable(tableRef, sourceColumnRefs, retainPKColumns); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public TupleProjector createTupleProjector(boolean retainPKColumns) { + KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); + List<Expression> exprs = Lists.<Expression> newArrayList(); + for (int i = 0; i < mappedColumns.size(); i++) { + if (!SchemaUtil.isPKColumn(mappedColumns.get(i)) || !retainPKColumns) { + Expression expr = newColumnExpression(i); + exprs.add(expr); + builder.addField(expr); + } + } + + return new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()])); + } + + public RowProjector createRowProjector() { + List<ColumnProjector> columnProjectors = Lists.<ColumnProjector>newArrayList(); + for (int i = 0; i < mappedColumns.size(); i++) { + PColumn column = mappedColumns.get(i); + Expression expr = newColumnExpression(i); // Do not use column.position() here. + columnProjectors.add(new ExpressionProjector(column.getName().getString(), tableRef.getTable().getName().getString(), expr, false)); + } + // TODO get estimate row size + return new RowProjector(columnProjectors, 0, false); + } + + public void setupScanForExtendedTable(Scan scan, ImmutableBitSet extendedColumnRef, + PhoenixConnection connection) throws SQLException { + if (extendedTableRef == null || extendedColumnRef.isEmpty()) { + return; + } + + TableRef dataTableRef = null; + List<PColumn> dataColumns = Lists.newArrayList(); + KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); + List<Expression> exprs = Lists.<Expression> newArrayList(); + for (int i = extendedColumnsOffset; i < mappedColumns.size(); i++) { + ProjectedColumn column = (ProjectedColumn) mappedColumns.get(i); + builder.addField(column); + if (extendedColumnRef.get(i)) { + dataColumns.add(column.getSourceColumnRef().getColumn()); + exprs.add(column.getSourceColumnRef().newColumnExpression()); + if (dataTableRef == null) { + dataTableRef = column.getSourceColumnRef().getTableRef(); + } + } else { + exprs.add(LiteralExpression.newConstant(null)); + } + } + if (dataColumns.isEmpty()) { + return; + } + + // Set data columns to be join back from data table. + serializeDataTableColumnsToJoin(scan, dataColumns); + // Set tuple projector of the data columns. + TupleProjector projector = new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()])); + TupleProjector.serializeProjectorIntoScan(scan, projector, IndexUtil.INDEX_PROJECTOR); + PTable dataTable = dataTableRef.getTable(); + // Set index maintainer of the local index. + serializeIndexMaintainerIntoScan(scan, dataTable, connection); + // Set view constants if exists. + serializeViewConstantsIntoScan(scan, dataTable); + } + + private static void serializeDataTableColumnsToJoin(Scan scan, List<PColumn> dataColumns) { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + try { + DataOutputStream output = new DataOutputStream(stream); + WritableUtils.writeVInt(output, dataColumns.size()); + for (PColumn column : dataColumns) { + Bytes.writeByteArray(output, column.getFamilyName().getBytes()); + Bytes.writeByteArray(output, column.getName().getBytes()); + } + scan.setAttribute(BaseScannerRegionObserver.DATA_TABLE_COLUMNS_TO_JOIN, stream.toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + try { + stream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + private void serializeIndexMaintainerIntoScan(Scan scan, PTable dataTable, PhoenixConnection connection) throws SQLException { + PName name = getPTable().getName(); + List<PTable> indexes = Lists.newArrayListWithExpectedSize(1); + for (PTable index : dataTable.getIndexes()) { + if (index.getName().equals(name) && index.getIndexType() == IndexType.LOCAL) { + indexes.add(index); + break; + } + } + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + IndexMaintainer.serialize(dataTable, ptr, indexes, connection); + scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, ByteUtil.copyKeyBytesIfNecessary(ptr)); + if (dataTable.isTransactional()) { + scan.setAttribute(BaseScannerRegionObserver.TX_STATE, connection.getMutationState().encodeTransaction()); + } + } + + private static void serializeViewConstantsIntoScan(Scan scan, PTable dataTable) { + int dataPosOffset = (dataTable.getBucketNum() != null ? 1 : 0) + (dataTable.isMultiTenant() ? 1 : 0); + int nViewConstants = 0; + if (dataTable.getType() == PTableType.VIEW) { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + List<PColumn> dataPkColumns = dataTable.getPKColumns(); + for (int i = dataPosOffset; i < dataPkColumns.size(); i++) { + PColumn dataPKColumn = dataPkColumns.get(i); + if (dataPKColumn.getViewConstant() != null) { + nViewConstants++; + } + } + if (nViewConstants > 0) { + byte[][] viewConstants = new byte[nViewConstants][]; + int j = 0; + for (int i = dataPosOffset; i < dataPkColumns.size(); i++) { + PColumn dataPkColumn = dataPkColumns.get(i); + if (dataPkColumn.getViewConstant() != null) { + if (IndexUtil.getViewConstantValue(dataPkColumn, ptr)) { + viewConstants[j++] = ByteUtil.copyKeyBytesIfNecessary(ptr); + } else { + throw new IllegalStateException(); + } + } + } + serializeViewConstantsIntoScan(viewConstants, scan); + } + } + } + + private static void serializeViewConstantsIntoScan(byte[][] viewConstants, Scan scan) { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + try { + DataOutputStream output = new DataOutputStream(stream); + WritableUtils.writeVInt(output, viewConstants.length); + for (byte[] viewConstant : viewConstants) { + Bytes.writeByteArray(output, viewConstant); + } + scan.setAttribute(BaseScannerRegionObserver.VIEW_CONSTANTS, stream.toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + try { + stream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + private static List<PColumn> getMappedColumns(PTable pTable) { + int initPosition = + (pTable.getBucketNum() ==null ? 0 : 1) + + (pTable.isMultiTenant() ? 1 : 0) + + (pTable.getViewIndexId() == null ? 0 : 1); + List<PColumn> columns = Lists.newArrayListWithExpectedSize(pTable.getColumns().size() - initPosition); + for (int i = initPosition; i < pTable.getPKColumns().size(); i++) { + columns.add(pTable.getPKColumns().get(i)); + } + for (PColumnFamily family : pTable.getColumnFamilies()) { + for (PColumn column : family.getColumns()) { + columns.add(column); + } + } + + return columns; + } + + private static List<PColumn> getDataTableMappedColumns(TableRef dataTableRef, List<PColumn> mappedColumns) { + Set<String> names = Sets.newHashSet(); + for (PColumn column : mappedColumns) { + names.add(column.getName().getString()); + } + List<PColumn> projectedColumns = new ArrayList<PColumn>(); + for (PColumnFamily cf : dataTableRef.getTable().getColumnFamilies()) { + for (PColumn sourceColumn : cf.getColumns()) { + String colName = IndexUtil.getIndexColumnName(sourceColumn); + if (!names.contains(colName)) { + ColumnRef sourceColumnRef = + new ColumnRef(dataTableRef, sourceColumn.getPosition()); + PColumn column = new ProjectedColumn(PNameFactory.newName(colName), + cf.getName(), projectedColumns.size(), + sourceColumn.isNullable(), sourceColumnRef); + projectedColumns.add(column); + } + } + } + + return projectedColumns; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementor.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementor.java index 2ddc3ea,0000000..cdcd74f mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementor.java @@@ -1,52 -1,0 +1,51 @@@ +package org.apache.phoenix.calcite.rel; + +import java.util.List; + +import org.apache.calcite.util.ImmutableIntList; +import org.apache.phoenix.calcite.PhoenixSequence; +import org.apache.phoenix.calcite.TableMapping; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.SequenceValueExpression; +import org.apache.phoenix.execute.RuntimeContext; +import org.apache.phoenix.execute.TupleProjector; - import org.apache.phoenix.expression.ColumnExpression; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.parse.SequenceValueParseNode; +import org.apache.phoenix.schema.types.PDataType; + +/** Holds context for an traversal over a tree of relational expressions + * to convert it to an executable plan. */ +public interface PhoenixRelImplementor { + QueryPlan visitInput(int i, PhoenixQueryRel input); - ColumnExpression newColumnExpression(int index); ++ Expression newColumnExpression(int index); + @SuppressWarnings("rawtypes") + Expression newBindParameterExpression(int index, PDataType type, Integer maxLength); + @SuppressWarnings("rawtypes") + Expression newFieldAccessExpression(String variableId, int index, PDataType type); + SequenceValueExpression newSequenceExpression(PhoenixSequence seq, SequenceValueParseNode.Op op); + RuntimeContext getRuntimeContext(); + void setTableMapping(TableMapping tableMapping); + TableMapping getTableMapping(); + void setSequenceManager(SequenceManager sequenceManager); + void pushContext(ImplementorContext context); + ImplementorContext popContext(); + ImplementorContext getCurrentContext(); + TupleProjector project(List<Expression> exprs); + + class ImplementorContext { + public final boolean retainPKColumns; + public final boolean forceProject; + public final ImmutableIntList columnRefList; + + public ImplementorContext(boolean retainPKColumns, boolean forceProject, ImmutableIntList columnRefList) { + this.retainPKColumns = retainPKColumns; + this.forceProject = forceProject; + this.columnRefList = columnRefList; + } + + public ImplementorContext withColumnRefList(ImmutableIntList columnRefList) { + return new ImplementorContext(this.retainPKColumns, this.forceProject, columnRefList); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java index fa4649b,0000000..d581ec0 mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java @@@ -1,140 -1,0 +1,139 @@@ +package org.apache.phoenix.calcite.rel; + +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; +import java.util.Stack; + +import org.apache.phoenix.calcite.PhoenixSequence; +import org.apache.phoenix.calcite.TableMapping; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.SequenceValueExpression; +import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.execute.RuntimeContext; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.expression.BindParameterExpression; - import org.apache.phoenix.expression.ColumnExpression; +import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.parse.ParseNodeFactory; +import org.apache.phoenix.parse.SequenceValueParseNode; +import org.apache.phoenix.parse.TableName; +import org.apache.phoenix.schema.KeyValueSchema; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PColumnImpl; +import org.apache.phoenix.schema.PName; +import org.apache.phoenix.schema.PNameFactory; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableImpl; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.types.PDataType; +import com.google.common.collect.Lists; + +public class PhoenixRelImplementorImpl implements PhoenixRelImplementor { + private final RuntimeContext runtimeContext; + private Stack<ImplementorContext> contextStack; + private SequenceManager sequenceManager; + private TableMapping tableMapping; + + public PhoenixRelImplementorImpl(RuntimeContext runtimeContext) { + this.runtimeContext = runtimeContext; + this.contextStack = new Stack<ImplementorContext>(); + } + + @Override + public QueryPlan visitInput(int i, PhoenixQueryRel input) { + return input.implement(this); + } + + @Override - public ColumnExpression newColumnExpression(int index) { ++ public Expression newColumnExpression(int index) { + return tableMapping.newColumnExpression(index); + } + + @SuppressWarnings("rawtypes") + @Override + public Expression newBindParameterExpression(int index, PDataType type, Integer maxLength) { + return new BindParameterExpression(index, type, maxLength, runtimeContext); + } + + @SuppressWarnings("rawtypes") + @Override + public Expression newFieldAccessExpression(String variableId, int index, PDataType type) { + Expression fieldAccessExpr = runtimeContext.getCorrelateVariable(variableId).newExpression(index); + return new CorrelateVariableFieldAccessExpression(runtimeContext, variableId, fieldAccessExpr); + } + + @Override + public SequenceValueExpression newSequenceExpression(PhoenixSequence seq, SequenceValueParseNode.Op op) { + PName tenantName = seq.pc.getTenantId(); + TableName tableName = TableName.create(seq.schemaName, seq.sequenceName); + try { + return sequenceManager.newSequenceReference(tenantName, tableName, null, op); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public RuntimeContext getRuntimeContext() { + return runtimeContext; + } + + @Override + public void setTableMapping(TableMapping tableMapping) { + this.tableMapping = tableMapping; + } + + @Override + public TableMapping getTableMapping() { + return this.tableMapping; + } + + @Override + public void setSequenceManager(SequenceManager sequenceManager) { + this.sequenceManager = sequenceManager; + } + + @Override + public void pushContext(ImplementorContext context) { + this.contextStack.push(context); + } + + @Override + public ImplementorContext popContext() { + return contextStack.pop(); + } + + @Override + public ImplementorContext getCurrentContext() { + return contextStack.peek(); + } + + @Override + public TupleProjector project(List<Expression> exprs) { + KeyValueSchema.KeyValueSchemaBuilder builder = new KeyValueSchema.KeyValueSchemaBuilder(0); + List<PColumn> columns = Lists.<PColumn>newArrayList(); + for (int i = 0; i < exprs.size(); i++) { + String name = ParseNodeFactory.createTempAlias(); + Expression expr = exprs.get(i); + builder.addField(expr); + columns.add(new PColumnImpl(PNameFactory.newName(name), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), + expr.getDataType(), expr.getMaxLength(), expr.getScale(), expr.isNullable(), + i, expr.getSortOrder(), null, null, false, name, false, false)); + } + try { + PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME, + PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, + null, null, columns, null, null, Collections.<PTable>emptyList(), + false, Collections.<PName>emptyList(), null, null, false, false, false, null, + null, null, true, false, 0, 0, false, null, false); + this.setTableMapping(new TableMapping(pTable)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + + return new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()])); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java index 0915d09,0000000..579914a mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java @@@ -1,395 -1,0 +1,396 @@@ +package org.apache.phoenix.calcite.rel; + +import static org.apache.phoenix.execute.MutationState.RowTimestampColInfo.NULL_ROWTIMESTAMP_INFO; + +import java.sql.ParameterMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.Prepare.CatalogReader; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.TableModify; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.calcite.PhoenixTable; +import org.apache.phoenix.compile.ColumnResolver; +import org.apache.phoenix.compile.ExplainPlan; +import org.apache.phoenix.compile.FromCompiler; +import org.apache.phoenix.compile.MutationPlan; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.compile.StatementPlan; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.execute.MutationState.RowMutationState; +import org.apache.phoenix.execute.MutationState.RowTimestampColInfo; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixResultSet; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.IllegalDataException; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PName; +import org.apache.phoenix.schema.PRow; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.MetaDataUtil; +import org.apache.phoenix.util.ScanUtil; +import org.apache.phoenix.util.SchemaUtil; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +public class PhoenixTableModify extends TableModify implements PhoenixRel { + + public PhoenixTableModify(RelOptCluster cluster, RelTraitSet traits, + RelOptTable table, CatalogReader catalogReader, RelNode child, + Operation operation, List<String> updateColumnList, boolean flattened) { + super(cluster, traits, table, catalogReader, child, operation, updateColumnList, flattened); + assert operation == Operation.INSERT || operation == Operation.DELETE; + } + + @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new PhoenixTableModify( + getCluster(), + traitSet, + getTable(), + getCatalogReader(), + sole(inputs), + getOperation(), + getUpdateColumnList(), + isFlattened()); + } + + @Override + public StatementPlan implement(PhoenixRelImplementor implementor) { + final QueryPlan queryPlan = implementor.visitInput(0, (PhoenixQueryRel) input); + final RowProjector projector = implementor.getTableMapping().createRowProjector(); + + final PhoenixTable targetTable = getTable().unwrap(PhoenixTable.class); + final PhoenixConnection connection = targetTable.pc; + final TableRef targetTableRef = targetTable.tableMapping.getTableRef(); + + if (getOperation() == Operation.INSERT) { + return upsert(connection, targetTable, targetTableRef, queryPlan, projector); + } + + // delete + return delete(connection, targetTable, targetTableRef, queryPlan, projector); + } + + private static MutationPlan upsert(final PhoenixConnection connection, + final PhoenixTable targetTable, final TableRef targetTableRef, + final QueryPlan queryPlan, final RowProjector projector) { + try (PhoenixStatement stmt = new PhoenixStatement(connection)) { + final ColumnResolver resolver = FromCompiler.getResolver(targetTableRef); + final StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); + + // TODO TenantId, ViewIndexId, UpdatableViewColumns + final List<PColumn> mappedColumns = targetTable.tableMapping.getMappedColumns(); + final int[] columnIndexes = new int[mappedColumns.size()]; + final int[] pkSlotIndexes = new int[mappedColumns.size()]; + for (int i = 0; i < columnIndexes.length; i++) { + PColumn column = mappedColumns.get(i); + if (SchemaUtil.isPKColumn(column)) { + pkSlotIndexes[i] = column.getPosition(); + } + columnIndexes[i] = column.getPosition(); + } + // TODO + final boolean useServerTimestamp = false; + + return new MutationPlan() { + @Override + public ParameterMetaData getParameterMetaData() { + return queryPlan.getContext().getBindManager().getParameterMetaData(); + } + + @Override + public StatementContext getContext() { + return context; + } + + @Override + public TableRef getTargetRef() { + return targetTableRef; + } + + @Override + public Set<TableRef> getSourceRefs() { + // TODO return originalQueryPlan.getSourceRefs(); + return queryPlan.getSourceRefs(); + } + + @Override + public org.apache.phoenix.jdbc.PhoenixStatement.Operation getOperation() { + return org.apache.phoenix.jdbc.PhoenixStatement.Operation.UPSERT; + } + + @Override + public MutationState execute() throws SQLException { + ResultIterator iterator = queryPlan.iterator(); + // simplest version, no run-on-server, no pipelined update + StatementContext childContext = queryPlan.getContext(); + ConnectionQueryServices services = connection.getQueryServices(); + int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB, + QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); + int batchSize = Math.min(connection.getMutateBatchSize(), maxSize); + boolean isAutoCommit = connection.getAutoCommit(); + byte[][] values = new byte[columnIndexes.length][]; + int rowCount = 0; + Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize); + PTable table = targetTableRef.getTable(); + try (ResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + while (rs.next()) { + for (int i = 0; i < values.length; i++) { + PColumn column = table.getColumns().get(columnIndexes[i]); + byte[] bytes = rs.getBytes(i + 1); + ptr.set(bytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : bytes); + Object value = rs.getObject(i + 1); + int rsPrecision = rs.getMetaData().getPrecision(i + 1); + Integer precision = rsPrecision == 0 ? null : rsPrecision; + int rsScale = rs.getMetaData().getScale(i + 1); + Integer scale = rsScale == 0 ? null : rsScale; + // We are guaranteed that the two column will have compatible types, + // as we checked that before. - if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(), precision, scale, - column.getMaxLength(), column.getScale())) { throw new SQLExceptionInfo.Builder( ++ if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(), SortOrder.getDefault(), precision, ++ scale, column.getMaxLength(), column.getScale())) { throw new SQLExceptionInfo.Builder( + SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString()) + .setMessage("value=" + column.getDataType().toStringLiteral(ptr, null)).build() + .buildException(); } + column.getDataType().coerceBytes(ptr, value, column.getDataType(), + precision, scale, SortOrder.getDefault(), + column.getMaxLength(), column.getScale(), column.getSortOrder(), + table.rowKeyOrderOptimizable()); + values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr); + } - setValues(values, pkSlotIndexes, columnIndexes, table, mutation, connection, useServerTimestamp); ++ // TODO onDupKeyBytes ++ setValues(values, pkSlotIndexes, columnIndexes, table, mutation, connection, useServerTimestamp, null); + rowCount++; + // Commit a batch if auto commit is true and we're at our batch size + if (isAutoCommit && rowCount % batchSize == 0) { + MutationState state = new MutationState(targetTableRef, mutation, 0, maxSize, connection); + connection.getMutationState().join(state); + connection.getMutationState().send(); + mutation.clear(); + } + } + // If auto commit is true, this last batch will be committed upon return + return new MutationState(targetTableRef, mutation, rowCount / batchSize * batchSize, maxSize, connection); + } + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + List<String> queryPlanSteps = queryPlan.getExplainPlan().getPlanSteps(); + List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1); + planSteps.add("UPSERT SELECT"); + planSteps.addAll(queryPlanSteps); + return new ExplainPlan(planSteps); + } + }; + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + - private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, PhoenixConnection connection, boolean useServerTimestamp) { ++ private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, PhoenixConnection connection, boolean useServerTimestamp, byte[] onDupKeyBytes) { + Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length); + byte[][] pkValues = new byte[table.getPKColumns().size()][]; + // If the table uses salting, the first byte is the salting byte, set to an empty array + // here and we will fill in the byte later in PRowImpl. + if (table.getBucketNum() != null) { + pkValues[0] = new byte[] {0}; + } + Long rowTimestamp = null; // case when the table doesn't have a row timestamp column + RowTimestampColInfo rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp); + for (int i = 0; i < values.length; i++) { + byte[] value = values[i]; + PColumn column = table.getColumns().get(columnIndexes[i]); + if (SchemaUtil.isPKColumn(column)) { + pkValues[pkSlotIndex[i]] = value; + if (SchemaUtil.getPKPosition(table, column) == table.getRowTimestampColPos()) { + if (!useServerTimestamp) { + PColumn rowTimestampCol = table.getPKColumns().get(table.getRowTimestampColPos()); + rowTimestamp = PLong.INSTANCE.getCodec().decodeLong(value, 0, rowTimestampCol.getSortOrder()); + if (rowTimestamp < 0) { + throw new IllegalDataException("Value of a column designated as ROW_TIMESTAMP cannot be less than zero"); + } + rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp); + } + } + } else { + columnValues.put(column, value); + } + } + ImmutableBytesPtr ptr = new ImmutableBytesPtr(); + table.newKey(ptr, pkValues); - mutation.put(ptr, new RowMutationState(columnValues, connection.getStatementExecutionCounter(), rowTsColInfo)); ++ mutation.put(ptr, new RowMutationState(columnValues, connection.getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes)); + } + + private static MutationPlan delete(final PhoenixConnection connection, + final PhoenixTable targetTable, final TableRef targetTableRef, + final QueryPlan queryPlan, final RowProjector projector) { + final StatementContext context = queryPlan.getContext(); + // TODO + final boolean deleteFromImmutableIndexToo = false; + return new MutationPlan() { + @Override + public ParameterMetaData getParameterMetaData() { + return context.getBindManager().getParameterMetaData(); + } + + @Override + public StatementContext getContext() { + return context; + } + + @Override + public TableRef getTargetRef() { + return targetTableRef; + } + + @Override + public Set<TableRef> getSourceRefs() { + // TODO dataPlan.getSourceRefs(); + return queryPlan.getSourceRefs(); + } + + @Override + public org.apache.phoenix.jdbc.PhoenixStatement.Operation getOperation() { + return org.apache.phoenix.jdbc.PhoenixStatement.Operation.DELETE; + } + + @Override + public MutationState execute() throws SQLException { + ResultIterator iterator = queryPlan.iterator(); + try { + // TODO hasLimit?? + return deleteRows(context, targetTableRef, deleteFromImmutableIndexToo ? queryPlan.getTableRef() : null, iterator, projector, queryPlan.getTableRef()); + } finally { + iterator.close(); + } + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + List<String> queryPlanSteps = queryPlan.getExplainPlan().getPlanSteps(); + List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1); + planSteps.add("DELETE ROWS"); + planSteps.addAll(queryPlanSteps); + return new ExplainPlan(planSteps); + } + }; + } + + private static MutationState deleteRows(StatementContext childContext, TableRef targetTableRef, TableRef indexTableRef, ResultIterator iterator, RowProjector projector, TableRef sourceTableRef) throws SQLException { + PTable table = targetTableRef.getTable(); + PhoenixStatement statement = childContext.getStatement(); + PhoenixConnection connection = statement.getConnection(); + PName tenantId = connection.getTenantId(); + byte[] tenantIdBytes = null; + if (tenantId != null) { + tenantIdBytes = ScanUtil.getTenantIdBytes(table.getRowKeySchema(), table.getBucketNum() != null, tenantId, table.getViewIndexId() != null); + } + final boolean isAutoCommit = connection.getAutoCommit(); + ConnectionQueryServices services = connection.getQueryServices(); + final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); + final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize); + Map<ImmutableBytesPtr,RowMutationState> mutations = Maps.newHashMapWithExpectedSize(batchSize); + Map<ImmutableBytesPtr,RowMutationState> indexMutations = null; + // If indexTableRef is set, we're deleting the rows from both the index table and + // the data table through a single query to save executing an additional one. + if (indexTableRef != null) { + indexMutations = Maps.newHashMapWithExpectedSize(batchSize); + } + List<PColumn> pkColumns = table.getPKColumns(); + boolean isMultiTenant = table.isMultiTenant() && tenantIdBytes != null; + boolean isSharedViewIndex = table.getViewIndexId() != null; + int offset = (table.getBucketNum() == null ? 0 : 1); + byte[][] values = new byte[pkColumns.size()][]; + if (isMultiTenant) { + values[offset++] = tenantIdBytes; + } + if (isSharedViewIndex) { + values[offset++] = MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId()); + } + try (PhoenixResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) { + int rowCount = 0; + while (rs.next()) { + ImmutableBytesPtr ptr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map + // Use tuple directly, as projector would not have all the PK columns from + // our index table inside of our projection. Since the tables are equal, + // there's no transation required. + if (sourceTableRef.equals(targetTableRef)) { + rs.getCurrentRow().getKey(ptr); + } else { + for (int i = offset; i < values.length; i++) { + byte[] byteValue = rs.getBytes(i+1-offset); + // The ResultSet.getBytes() call will have inverted it - we need to invert it back. + // TODO: consider going under the hood and just getting the bytes + if (pkColumns.get(i).getSortOrder() == SortOrder.DESC) { + byte[] tempByteValue = Arrays.copyOf(byteValue, byteValue.length); + byteValue = SortOrder.invert(byteValue, 0, tempByteValue, 0, byteValue.length); + } + values[i] = byteValue; + } + table.newKey(ptr, values); + } + // When issuing deletes, we do not care about the row time ranges. Also, if the table had a row timestamp column, then the + // row key will already have its value. - mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO)); ++ mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); + if (indexTableRef != null) { + ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map + rs.getCurrentRow().getKey(indexPtr); - indexMutations.put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO)); ++ indexMutations.put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); + } + if (mutations.size() > maxSize) { + throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize); + } + rowCount++; + // Commit a batch if auto commit is true and we're at our batch size + if (isAutoCommit && rowCount % batchSize == 0) { + MutationState state = new MutationState(targetTableRef, mutations, 0, maxSize, connection); + connection.getMutationState().join(state); + if (indexTableRef != null) { + MutationState indexState = new MutationState(indexTableRef, indexMutations, 0, maxSize, connection); + connection.getMutationState().join(indexState); + } + connection.getMutationState().send(); + mutations.clear(); + if (indexMutations != null) { + indexMutations.clear(); + } + } + } + + // If auto commit is true, this last batch will be committed upon return + int nCommittedRows = rowCount / batchSize * batchSize; + MutationState state = new MutationState(targetTableRef, mutations, nCommittedRows, maxSize, connection); + if (indexTableRef != null) { + // To prevent the counting of these index rows, we have a negative for remainingRows. + MutationState indexState = new MutationState(indexTableRef, indexMutations, 0, maxSize, connection); + state.join(indexState); + } + return state; + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java index c24ab57,4159be1..bdc4da9 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java @@@ -119,18 -119,17 +119,21 @@@ public class SequenceManager return dstSequenceValues[index]; } } - + public SequenceValueExpression newSequenceReference(SequenceValueParseNode node) throws SQLException { PName tenantName = statement.getConnection().getTenantId(); - String tenantId = tenantName == null ? null : tenantName.getString(); TableName tableName = node.getTableName(); + ParseNode numToAllocateNode = node.getNumToAllocateNode(); + return newSequenceReference(tenantName, tableName, numToAllocateNode, node.getOp()); + } + + public SequenceValueExpression newSequenceReference(PName tenantName, + TableName tableName, ParseNode numToAllocateNode, SequenceValueParseNode.Op op) throws SQLException { + String tenantId = tenantName == null ? null : tenantName.getString(); + if (tableName.getSchemaName() == null && statement.getConnection().getSchema() != null) { + tableName = TableName.create(statement.getConnection().getSchema(), tableName.getTableName()); + } int nSaltBuckets = statement.getConnection().getQueryServices().getSequenceSaltBuckets(); - ParseNode numToAllocateNode = node.getNumToAllocateNode(); - long numToAllocate = determineNumToAllocate(tableName, numToAllocateNode); SequenceKey key = new SequenceKey(tenantId, tableName.getSchemaName(), tableName.getTableName(), nSaltBuckets); SequenceValueExpression expression = sequenceMap.get(key); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 5875c0a,25f3bec..45cf6bb --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@@ -142,19 -145,13 +145,17 @@@ public abstract class BaseResultIterato } protected boolean useStats() { + return useStats(this.context); + } + + private static boolean useStats(StatementContext context) { - Scan scan = context.getScan(); - boolean isPointLookup = context.getScanRanges().isPointLookup(); /* * Don't use guide posts: * 1) If we're collecting stats, as in this case we need to scan entire * regions worth of data to track where to put the guide posts. * 2) If the query is going to be executed serially. */ -- if (ScanUtil.isAnalyzeTable(scan)) { ++ if (ScanUtil.isAnalyzeTable(context.getScan())) { return false; } return true; @@@ -421,55 -422,40 +426,39 @@@ return guideIndex; } - private GuidePostsInfo getGuidePosts() throws SQLException { - if (!useStats() || !StatisticsUtil.isStatsEnabled(TableName.valueOf(physicalTableName))) { + private static GuidePostsInfo getGuidePosts(StatementContext context, PTable table, Set<byte[]> whereConditions, boolean useStats) throws SQLException { - if (!useStats) { return GuidePostsInfo.NO_GUIDEPOST; } ++ final byte[] physicalTableName = table.getPhysicalName().getBytes(); ++ if (!useStats(context) || !StatisticsUtil.isStatsEnabled(TableName.valueOf(physicalTableName))) { + return GuidePostsInfo.NO_GUIDEPOST; + } - GuidePostsInfo gps = null; - PTableStats tableStats = new MetaDataClient(context.getConnection()).getTableStats(table); - Map<byte[], GuidePostsInfo> guidePostMap = tableStats.getGuidePosts(); - TreeSet<byte[]> whereConditions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); + for(Pair<byte[], byte[]> where : context.getWhereConditionColumns()) { + byte[] cf = where.getFirst(); + if (cf != null) { + whereConditions.add(cf); + } + } - PTable table = getTable(); - byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(getTable()); + byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(table); - if (table.getColumnFamilies().isEmpty()) { - // For sure we can get the defaultCF from the table - gps = getDefaultFamilyGuidePosts(guidePostMap, defaultCF); - } else { - if (whereConditions.isEmpty() || whereConditions.contains(defaultCF)) { - gps = getDefaultFamilyGuidePosts(guidePostMap, defaultCF); - } else { - byte[] familyInWhere = whereConditions.iterator().next(); - GuidePostsInfo guidePostsInfo = guidePostMap.get(familyInWhere); - if (guidePostsInfo != null) { - gps = guidePostsInfo; - } else { - // As there are no guideposts collected for the where family we go with the default CF - gps = getDefaultFamilyGuidePosts(guidePostMap, defaultCF); + byte[] cf = null; + if ( !table.getColumnFamilies().isEmpty() && !whereConditions.isEmpty() ) { + for(Pair<byte[], byte[]> where : context.getWhereConditionColumns()) { + byte[] whereCF = where.getFirst(); + if (Bytes.compareTo(defaultCF, whereCF) == 0) { + cf = defaultCF; + break; } } + if (cf == null) { + cf = context.getWhereConditionColumns().get(0).getFirst(); + } } - if (gps == null) { return GuidePostsInfo.NO_GUIDEPOST; } - return gps; - } - - private static GuidePostsInfo getDefaultFamilyGuidePosts(Map<byte[], GuidePostsInfo> guidePostMap, byte[] defaultCF) { - if (guidePostMap.get(defaultCF) != null) { - return guidePostMap.get(defaultCF); + if (cf == null) { + cf = defaultCF; } - return null; + GuidePostsKey key = new GuidePostsKey(physicalTableName, cf); + return context.getConnection().getQueryServices().getTableStats(key); } - private static String toString(List<byte[]> gps) { - StringBuilder buf = new StringBuilder(gps.size() * 100); - buf.append("["); - for (int i = 0; i < gps.size(); i++) { - buf.append(Bytes.toStringBinary(gps.get(i))); - buf.append(","); - if (i > 0 && i < gps.size()-1 && (i % 10) == 0) { - buf.append("\n"); - } - } - buf.setCharAt(buf.length()-1, ']'); - return buf.toString(); - } - private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan, byte[] startKey, boolean crossedRegionBoundary, HRegionLocation regionLocation) { boolean startNewScan = scanGrouper.shouldStartNewScan(plan, scans, startKey, crossedRegionBoundary); if (scan != null) { @@@ -683,130 -662,6 +672,130 @@@ return parallelScans; } + + /** + * Compute the estimated count of rows and bytes that will be scanned. + * @return the estimated row count and the byte count. + * @throws SQLException + */ + public static Pair<Long, Long> getEstimatedCount(StatementContext context, PTable table) throws SQLException { + if (table.getName() == null) { // empty table + return new Pair<Long, Long>(null, null); + } + + if (context.getScanRanges().isPointLookup()) { + return new Pair<Long, Long>(1L, SchemaUtil.estimateRowSize(table)); + } + + TreeSet<byte[]> whereConditions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); + for(Pair<byte[], byte[]> where : context.getWhereConditionColumns()) { + byte[] cf = where.getFirst(); + if (cf != null) { + whereConditions.add(cf); + } + } + GuidePostsInfo gps = getGuidePosts(context, table, whereConditions, useStats(context)); + if (gps == GuidePostsInfo.NO_GUIDEPOST) { + return new Pair<Long, Long>(null, null); + } + + byte[] startKey = ByteUtil.EMPTY_BYTE_ARRAY; + byte[] stopKey = ByteUtil.EMPTY_BYTE_ARRAY; + Scan scan = context.getScan(); + List<HRegionLocation> regionLocations = context.getConnection().getQueryServices() + .getAllTableRegions(table.getPhysicalName().getBytes()); + List<byte[]> regionBoundaries = toBoundaries(regionLocations); + ScanRanges scanRanges = context.getScanRanges(); + boolean isSalted = table.getBucketNum() != null; + boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL; + boolean traverseAllRegions = isSalted || isLocalIndex; + if (!traverseAllRegions) { + byte[] scanStartRow = scan.getStartRow(); + if (scanStartRow.length != 0 && Bytes.compareTo(scanStartRow, startKey) > 0) { + startKey = scanStartRow; + } + byte[] scanStopRow = scan.getStopRow(); + if (stopKey.length == 0 + || (scanStopRow.length != 0 && Bytes.compareTo(scanStopRow, stopKey) < 0)) { + stopKey = scanStopRow; + } + } + + int regionIndex = 0; + int stopIndex = regionBoundaries.size(); + if (startKey.length > 0) { + regionIndex = getIndexContainingInclusive(regionBoundaries, startKey); + } + if (stopKey.length > 0) { + stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopKey)); + if (isLocalIndex) { + stopKey = regionLocations.get(stopIndex).getRegionInfo().getEndKey(); + } + } + + ImmutableBytesWritable currentKey = new ImmutableBytesWritable(startKey); + + int gpsSize = gps.getGuidePostsCount(); + int keyOffset = 0; + ImmutableBytesWritable currentGuidePost = ByteUtil.EMPTY_IMMUTABLE_BYTE_ARRAY; + ImmutableBytesWritable guidePosts = gps.getGuidePosts(); + ByteArrayInputStream stream = null; + DataInput input = null; + PrefixByteDecoder decoder = null; + int guideIndex = 0; + long estimatedRows = 0; + long estimatedSize = 0; + try { + if (gpsSize > 0) { + stream = new ByteArrayInputStream(guidePosts.get(), guidePosts.getOffset(), guidePosts.getLength()); + input = new DataInputStream(stream); + decoder = new PrefixByteDecoder(gps.getMaxLength()); + try { + while (currentKey.compareTo(currentGuidePost = PrefixByteCodec.decode(decoder, input)) >= 0 + && currentKey.getLength() != 0) { + guideIndex++; + } + } catch (EOFException e) {} + } + byte[] currentKeyBytes = currentKey.copyBytes(); + + // Merge bisect with guideposts for all but the last region + while (regionIndex <= stopIndex) { + byte[] currentGuidePostBytes = currentGuidePost.copyBytes(); + byte[] endKey, endRegionKey = EMPTY_BYTE_ARRAY; + if (regionIndex == stopIndex) { + endKey = stopKey; + } else { + endKey = regionBoundaries.get(regionIndex); + } + HRegionLocation regionLocation = regionLocations.get(regionIndex); + if (isLocalIndex) { + HRegionInfo regionInfo = regionLocation.getRegionInfo(); + endRegionKey = regionInfo.getEndKey(); + keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey); + } + try { + while (guideIndex < gpsSize && (currentGuidePost.compareTo(endKey) <= 0 || endKey.length == 0)) { + Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes, keyOffset, + false); + if (newScan != null) { - estimatedRows += gps.getRowCounts().get(guideIndex); - estimatedSize += gps.getByteCounts().get(guideIndex); ++ estimatedRows += gps.getRowCounts()[guideIndex]; ++ estimatedSize += gps.getByteCounts()[guideIndex]; + } + currentKeyBytes = currentGuidePost.copyBytes(); + currentGuidePost = PrefixByteCodec.decode(decoder, input); + currentGuidePostBytes = currentGuidePost.copyBytes(); + guideIndex++; + } + } catch (EOFException e) {} + currentKeyBytes = endKey; + regionIndex++; + } + } finally { + if (stream != null) Closeables.closeQuietly(stream); + } + return new Pair<Long, Long>(estimatedRows, estimatedSize); + } public static <T> List<T> reverseIfNecessary(List<T> list, boolean reverse) { if (!reverse) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDef.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDef.java index d11fd12,0be7c16..e8b0593 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDef.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDef.java @@@ -50,7 -59,21 +59,21 @@@ public class ColumnDef private final String expressionStr; private final boolean isRowTimestamp; + public ColumnDef(ColumnDef def, String expressionStr) { + this.columnDefName = def.columnDefName; + this.dataType = def.dataType; + this.isNull = def.isNull; + this.maxLength = def.maxLength; + this.scale = def.scale; + this.isPK = def.isPK; + this.sortOrder = def.sortOrder; + this.isArray = def.isArray; + this.arrSize = def.arrSize; + this.isRowTimestamp = def.isRowTimestamp; + this.expressionStr = expressionStr; + } + - ColumnDef(ColumnName columnDefName, String sqlTypeName, boolean isArray, Integer arrSize, Boolean isNull, Integer maxLength, + public ColumnDef(ColumnName columnDefName, String sqlTypeName, boolean isArray, Integer arrSize, Boolean isNull, Integer maxLength, Integer scale, boolean isPK, SortOrder sortOrder, String expressionStr, boolean isRowTimestamp) { try { Preconditions.checkNotNull(sortOrder); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java index 512a1d2,07b412f..9ed4d1c --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java @@@ -124,11 -133,13 +133,13 @@@ public class StatisticsUtil return key; } - public static List<Result> readStatistics(HTableInterface statsHTable, byte[] tableNameBytes, ImmutableBytesPtr cf, + public static List<Result> readStatisticsForDelete(HTableInterface statsHTable, byte[] tableNameBytes, ImmutableBytesPtr cf, byte[] startKey, byte[] stopKey, long clientTimeStamp) throws IOException { List<Result> statsForRegion = new ArrayList<Result>(); - Scan s = MetaDataUtil.newTableRowsScan(getAdjustedKey(startKey, tableNameBytes, cf, false), - getAdjustedKey(stopKey, tableNameBytes, cf, true), MetaDataProtocol.MIN_TABLE_TIMESTAMP, + Scan s = MetaDataUtil.newTableRowsScan( + getAdjustedKey(startKey, tableNameBytes, cf, false), + getAdjustedKey(stopKey, tableNameBytes, cf, true), + MetaDataProtocol.MIN_TABLE_TIMESTAMP, clientTimeStamp); s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES); ResultScanner scanner = null; @@@ -144,26 -155,16 +155,39 @@@ } } return statsForRegion; + } + - public static PTableStats readStatistics(HTableInterface statsHTable, byte[] tableNameBytes, long clientTimeStamp) - throws IOException { - return readStatistics(statsHTable, tableNameBytes, null, null, null, clientTimeStamp); ++ public static List<Result> readStatistics(HTableInterface statsHTable, byte[] tableNameBytes, ImmutableBytesPtr cf, ++ byte[] startKey, byte[] stopKey, long clientTimeStamp) throws IOException { ++ List<Result> statsForRegion = new ArrayList<Result>(); ++ Scan s = MetaDataUtil.newTableRowsScan( ++ getAdjustedKey(startKey, tableNameBytes, cf, false), ++ getAdjustedKey(stopKey, tableNameBytes, cf, true), ++ MetaDataProtocol.MIN_TABLE_TIMESTAMP, ++ clientTimeStamp); ++ ResultScanner scanner = null; ++ try { ++ scanner = statsHTable.getScanner(s); ++ Result result = null; ++ while ((result = scanner.next()) != null) { ++ statsForRegion.add(result); ++ } ++ } finally { ++ if (scanner != null) { ++ scanner.close(); ++ } ++ } ++ return statsForRegion; } - public static PTableStats readStatistics(HTableInterface statsHTable, - byte[] tableNameBytes, ImmutableBytesPtr cf, byte[] startKey, byte[] stopKey, - long clientTimeStamp) + public static GuidePostsInfo readStatistics(HTableInterface statsHTable, GuidePostsKey key, long clientTimeStamp) throws IOException { ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - Scan s; - if (cf == null) { - s = MetaDataUtil.newTableRowsScan(tableNameBytes, MetaDataProtocol.MIN_TABLE_TIMESTAMP, clientTimeStamp); - } else { - s = MetaDataUtil.newTableRowsScan(getAdjustedKey(startKey, tableNameBytes, cf, false), - getAdjustedKey(stopKey, tableNameBytes, cf, true), MetaDataProtocol.MIN_TABLE_TIMESTAMP, - clientTimeStamp); - } + ptr.set(key.getColumnFamily()); + byte[] tableNameBytes = key.getPhysicalName(); + byte[] startKey = getStartKey(tableNameBytes, ptr); + byte[] endKey = getEndKey(tableNameBytes, ptr); + Scan s = MetaDataUtil.newTableRowsScan(startKey, endKey, MetaDataProtocol.MIN_TABLE_TIMESTAMP, clientTimeStamp); s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES); s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT_BYTES); s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java index 186370a,62aafa5..a009b5c --- a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java @@@ -269,28 -268,4 +269,32 @@@ public class CorrelatePlanTest throw new RuntimeException(e); } } + + private static class CorrelateVariableImpl implements CorrelateVariable { + private final TableRef tableRef; + private Tuple value; + + CorrelateVariableImpl(TableRef tableRef) { + this.tableRef = tableRef; + } + + @Override + public Expression newExpression(int index) { - return new ColumnRef(tableRef, index).newColumnExpression(); ++ try { ++ return new ColumnRef(tableRef, index).newColumnExpression(); ++ } catch (SQLException e) { ++ throw new RuntimeException(e); ++ } + } + + @Override + public Tuple getValue() { + return value; + } + + @Override + public void setValue(Tuple value) { + this.value = value; + } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/pom.xml ----------------------------------------------------------------------