Repository: phoenix Updated Branches: refs/heads/calcite 739b68cf2 -> 7c662fd35
Add transaction related code for query and DML (PHOENIX-2197 Support DML in Phoenix/Calcite integration) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7c662fd3 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7c662fd3 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7c662fd3 Branch: refs/heads/calcite Commit: 7c662fd35d8095c1f1096a2fd74eb0bcb58659ab Parents: 739b68c Author: maryannxue <maryann....@gmail.com> Authored: Thu May 5 10:41:47 2016 -0400 Committer: maryannxue <maryann....@gmail.com> Committed: Thu May 5 10:41:47 2016 -0400 ---------------------------------------------------------------------- .../apache/phoenix/calcite/BaseCalciteIT.java | 1 + .../apache/phoenix/calcite/CalciteRuntime.java | 46 ++++- .../phoenix/calcite/rel/PhoenixTableModify.java | 200 ++++++++++--------- 3 files changed, 147 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/7c662fd3/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java index cb7d01d..dc21809 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java @@ -122,6 +122,7 @@ public class BaseCalciteIT extends BaseClientManagedTimeIT { public void close() { if (connection != null) { try { + connection.commit(); connection.close(); } catch (SQLException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7c662fd3/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteRuntime.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteRuntime.java index a3feffa..df036a9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteRuntime.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteRuntime.java @@ -8,9 +8,12 @@ import org.apache.phoenix.compile.ColumnProjector; import org.apache.phoenix.compile.MutationPlan; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.compile.StatementPlan; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PDate; @@ -34,6 +37,7 @@ import org.apache.phoenix.schema.types.PhoenixArray; import java.sql.SQLException; import java.sql.Timestamp; +import java.util.Iterator; /** * Methods used by code generated by Calcite. @@ -60,13 +64,26 @@ public class CalciteRuntime { }; } - public static Enumerator<Object> toEnumerator(QueryPlan plan) throws SQLException { - final ResultIterator iterator = plan.iterator(); + public static Enumerator<Object> toEnumerator(final QueryPlan plan) throws SQLException { final RowProjector rowProjector = plan.getProjector(); final int count = rowProjector.getColumnCount(); return new Enumerator<Object>() { + ResultIterator iterator = null; Object current; - private final ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + final ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + + void init() throws SQLException { + final StatementContext context = plan.getContext(); + final PhoenixConnection connection = context.getConnection(); + Iterator<TableRef> tableRefs = plan.getSourceRefs().iterator(); + connection.getMutationState().sendUncommitted(tableRefs); + iterator = plan.iterator(); + context.getOverallQueryMetrics().startQuery(); + if (connection.getAutoCommit()) { + connection.commit(); + } + connection.incrementStatementExecutionCounter(); + } @Override public Object current() { @@ -76,6 +93,9 @@ public class CalciteRuntime { @Override public boolean moveNext() { try { + if (iterator == null) { + init(); + } final Tuple tuple = iterator.next(); if (tuple == null) { current = null; @@ -179,9 +199,23 @@ public class CalciteRuntime { } try { - MutationState state = plan.execute(); - updateCount = state.getUpdateCount(); - state.commit(); + final PhoenixConnection connection = plan.getContext().getConnection(); + final MutationState state = connection.getMutationState(); + if (plan.getTargetRef() != null + && plan.getTargetRef().getTable() != null + && plan.getTargetRef().getTable().isTransactional()) { + state.startTransaction(); + } + final Iterator<TableRef> tableRefs = plan.getSourceRefs().iterator(); + state.sendUncommitted(tableRefs); + state.checkpointIfNeccessary(plan); + final MutationState lastState = plan.execute(); + state.join(lastState); + if (connection.getAutoCommit()) { + connection.commit(); + } + updateCount = lastState.getUpdateCount(); + connection.incrementStatementExecutionCounter(); } catch (SQLException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7c662fd3/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java index 6292070..6629cb4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java @@ -13,12 +13,16 @@ import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.prepare.Prepare.CatalogReader; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.TableModify; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.calcite.PhoenixTable; +import org.apache.phoenix.compile.ColumnResolver; import org.apache.phoenix.compile.ExplainPlan; +import org.apache.phoenix.compile.FromCompiler; import org.apache.phoenix.compile.MutationPlan; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.SequenceManager; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.compile.StatementPlan; import org.apache.phoenix.exception.SQLExceptionCode; @@ -30,6 +34,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixResultSet; +import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; @@ -73,113 +78,120 @@ public class PhoenixTableModify extends TableModify implements PhoenixRel { final QueryPlan queryPlan = implementor.visitInput(0, (PhoenixQueryRel) input); final RowProjector projector = implementor.getTableMapping().createRowProjector(); + final PhoenixTable targetTable = getTable().unwrap(PhoenixTable.class); final PhoenixConnection connection = targetTable.pc; final TableRef targetTableRef = targetTable.tableMapping.getTableRef(); - final List<PColumn> mappedColumns = targetTable.tableMapping.getMappedColumns(); - final int[] columnIndexes = new int[mappedColumns.size()]; - final int[] pkSlotIndexes = new int[mappedColumns.size()]; - for (int i = 0; i < columnIndexes.length; i++) { - PColumn column = mappedColumns.get(i); - if (SchemaUtil.isPKColumn(column)) { - pkSlotIndexes[i] = column.getPosition(); - } - columnIndexes[i] = column.getPosition(); - } - // TODO - final boolean useServerTimestamp = false; - - return new MutationPlan() { - @Override - public ParameterMetaData getParameterMetaData() { - return queryPlan.getContext().getBindManager().getParameterMetaData(); - } + try (PhoenixStatement stmt = new PhoenixStatement(connection)) { + final ColumnResolver resolver = FromCompiler.getResolver(targetTableRef); + final StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); - @Override - public StatementContext getContext() { - return queryPlan.getContext(); + final List<PColumn> mappedColumns = targetTable.tableMapping.getMappedColumns(); + final int[] columnIndexes = new int[mappedColumns.size()]; + final int[] pkSlotIndexes = new int[mappedColumns.size()]; + for (int i = 0; i < columnIndexes.length; i++) { + PColumn column = mappedColumns.get(i); + if (SchemaUtil.isPKColumn(column)) { + pkSlotIndexes[i] = column.getPosition(); + } + columnIndexes[i] = column.getPosition(); } + // TODO + final boolean useServerTimestamp = false; + + return new MutationPlan() { + @Override + public ParameterMetaData getParameterMetaData() { + return queryPlan.getContext().getBindManager().getParameterMetaData(); + } - @Override - public TableRef getTargetRef() { - return targetTableRef; - } + @Override + public StatementContext getContext() { + return context; + } - @Override - public Set<TableRef> getSourceRefs() { - // TODO return originalQueryPlan.getSourceRefs(); - return queryPlan.getSourceRefs(); - } + @Override + public TableRef getTargetRef() { + return targetTableRef; + } - @Override - public org.apache.phoenix.jdbc.PhoenixStatement.Operation getOperation() { - return org.apache.phoenix.jdbc.PhoenixStatement.Operation.UPSERT; - } + @Override + public Set<TableRef> getSourceRefs() { + // TODO return originalQueryPlan.getSourceRefs(); + return queryPlan.getSourceRefs(); + } - @Override - public MutationState execute() throws SQLException { - ResultIterator iterator = queryPlan.iterator(); - // simplest version, no run-on-server, no pipelined update - StatementContext childContext = queryPlan.getContext(); - ConnectionQueryServices services = connection.getQueryServices(); - int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB, - QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); - int batchSize = Math.min(connection.getMutateBatchSize(), maxSize); - boolean isAutoCommit = connection.getAutoCommit(); - byte[][] values = new byte[columnIndexes.length][]; - int rowCount = 0; - Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize); - PTable table = targetTableRef.getTable(); - try (ResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) { - ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - while (rs.next()) { - for (int i = 0; i < values.length; i++) { - PColumn column = table.getColumns().get(columnIndexes[i]); - byte[] bytes = rs.getBytes(i + 1); - ptr.set(bytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : bytes); - Object value = rs.getObject(i + 1); - int rsPrecision = rs.getMetaData().getPrecision(i + 1); - Integer precision = rsPrecision == 0 ? null : rsPrecision; - int rsScale = rs.getMetaData().getScale(i + 1); - Integer scale = rsScale == 0 ? null : rsScale; - // We are guaranteed that the two column will have compatible types, - // as we checked that before. - if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(), precision, scale, - column.getMaxLength(), column.getScale())) { throw new SQLExceptionInfo.Builder( - SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString()) - .setMessage("value=" + column.getDataType().toStringLiteral(ptr, null)).build() - .buildException(); } - column.getDataType().coerceBytes(ptr, value, column.getDataType(), - precision, scale, SortOrder.getDefault(), - column.getMaxLength(), column.getScale(), column.getSortOrder(), - table.rowKeyOrderOptimizable()); - values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr); - } - setValues(values, pkSlotIndexes, columnIndexes, table, mutation, connection, useServerTimestamp); - rowCount++; - // Commit a batch if auto commit is true and we're at our batch size - if (isAutoCommit && rowCount % batchSize == 0) { - MutationState state = new MutationState(targetTableRef, mutation, 0, maxSize, connection); - connection.getMutationState().join(state); - connection.getMutationState().send(); - mutation.clear(); + @Override + public org.apache.phoenix.jdbc.PhoenixStatement.Operation getOperation() { + return org.apache.phoenix.jdbc.PhoenixStatement.Operation.UPSERT; + } + + @Override + public MutationState execute() throws SQLException { + ResultIterator iterator = queryPlan.iterator(); + // simplest version, no run-on-server, no pipelined update + StatementContext childContext = queryPlan.getContext(); + ConnectionQueryServices services = connection.getQueryServices(); + int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB, + QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); + int batchSize = Math.min(connection.getMutateBatchSize(), maxSize); + boolean isAutoCommit = connection.getAutoCommit(); + byte[][] values = new byte[columnIndexes.length][]; + int rowCount = 0; + Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize); + PTable table = targetTableRef.getTable(); + try (ResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + while (rs.next()) { + for (int i = 0; i < values.length; i++) { + PColumn column = table.getColumns().get(columnIndexes[i]); + byte[] bytes = rs.getBytes(i + 1); + ptr.set(bytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : bytes); + Object value = rs.getObject(i + 1); + int rsPrecision = rs.getMetaData().getPrecision(i + 1); + Integer precision = rsPrecision == 0 ? null : rsPrecision; + int rsScale = rs.getMetaData().getScale(i + 1); + Integer scale = rsScale == 0 ? null : rsScale; + // We are guaranteed that the two column will have compatible types, + // as we checked that before. + if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(), precision, scale, + column.getMaxLength(), column.getScale())) { throw new SQLExceptionInfo.Builder( + SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString()) + .setMessage("value=" + column.getDataType().toStringLiteral(ptr, null)).build() + .buildException(); } + column.getDataType().coerceBytes(ptr, value, column.getDataType(), + precision, scale, SortOrder.getDefault(), + column.getMaxLength(), column.getScale(), column.getSortOrder(), + table.rowKeyOrderOptimizable()); + values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr); + } + setValues(values, pkSlotIndexes, columnIndexes, table, mutation, connection, useServerTimestamp); + rowCount++; + // Commit a batch if auto commit is true and we're at our batch size + if (isAutoCommit && rowCount % batchSize == 0) { + MutationState state = new MutationState(targetTableRef, mutation, 0, maxSize, connection); + connection.getMutationState().join(state); + connection.getMutationState().send(); + mutation.clear(); + } } + // If auto commit is true, this last batch will be committed upon return + return new MutationState(targetTableRef, mutation, rowCount / batchSize * batchSize, maxSize, connection); } - // If auto commit is true, this last batch will be committed upon return - return new MutationState(targetTableRef, mutation, rowCount / batchSize * batchSize, maxSize, connection); } - } - @Override - public ExplainPlan getExplainPlan() throws SQLException { - List<String> queryPlanSteps = queryPlan.getExplainPlan().getPlanSteps(); - List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1); - planSteps.add("UPSERT SELECT"); - planSteps.addAll(queryPlanSteps); - return new ExplainPlan(planSteps); - } - - }; + @Override + public ExplainPlan getExplainPlan() throws SQLException { + List<String> queryPlanSteps = queryPlan.getExplainPlan().getPlanSteps(); + List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1); + planSteps.add("UPSERT SELECT"); + planSteps.addAll(queryPlanSteps); + return new ExplainPlan(planSteps); + } + }; + } catch (SQLException e) { + throw new RuntimeException(e); + } } private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, PhoenixConnection connection, boolean useServerTimestamp) {