Repository: phoenix Updated Branches: refs/heads/calcite 405499047 -> 29da79fa6
PHOENIX-3351 Implement TODOs in PhoenixTableModify#upsert to allow writes to tenant specific tables-calcite changes(Rajeshbabu) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/29da79fa Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/29da79fa Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/29da79fa Branch: refs/heads/calcite Commit: 29da79fa62a27547aec1d2083144121a6d6fad05 Parents: 4054990 Author: Rajeshbabu Chintaguntla <[email protected]> Authored: Fri Jan 27 22:06:37 2017 +0530 Committer: Rajeshbabu Chintaguntla <[email protected]> Committed: Fri Jan 27 22:06:37 2017 +0530 ---------------------------------------------------------------------- .../phoenix/calcite/rel/PhoenixTableModify.java | 95 +++----------------- 1 file changed, 11 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/29da79fa/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java index f13223f..052015e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java @@ -6,6 +6,8 @@ import java.sql.ParameterMetaData; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; @@ -29,6 +31,7 @@ import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.SequenceManager; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.compile.StatementPlan; +import org.apache.phoenix.compile.UpsertCompiler; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.MutationState; @@ -112,8 +115,14 @@ public class PhoenixTableModify extends TableModify implements PhoenixRel { final int[] pkSlotIndexes = new int[mappedColumns.size()]; for (int i = 0; i < columnIndexes.length; i++) { PColumn column = mappedColumns.get(i); + int pkColPosition = 0; if (SchemaUtil.isPKColumn(column)) { - pkSlotIndexes[i] = column.getPosition(); + for(PColumn col: mappedColumns) { + if(col.equals(column)) break; + // Since first columns in the mappedColumns are pk columns only. + pkColPosition++; + } + pkSlotIndexes[i] = pkColPosition; } columnIndexes[i] = column.getPosition(); } @@ -151,55 +160,7 @@ public class PhoenixTableModify extends TableModify implements PhoenixRel { public MutationState execute() throws SQLException { ResultIterator iterator = queryPlan.iterator(); // simplest version, no run-on-server, no pipelined update - StatementContext childContext = queryPlan.getContext(); - ConnectionQueryServices services = connection.getQueryServices(); - int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB, - QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); - int batchSize = Math.min(connection.getMutateBatchSize(), maxSize); - boolean isAutoCommit = connection.getAutoCommit(); - byte[][] values = new byte[columnIndexes.length][]; - int rowCount = 0; - Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize); - PTable table = targetTableRef.getTable(); - try (ResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) { - ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - while (rs.next()) { - for (int i = 0; i < values.length; i++) { - PColumn column = table.getColumns().get(columnIndexes[i]); - byte[] bytes = rs.getBytes(i + 1); - ptr.set(bytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : bytes); - Object value = rs.getObject(i + 1); - int rsPrecision = rs.getMetaData().getPrecision(i + 1); - Integer precision = rsPrecision == 0 ? null : rsPrecision; - int rsScale = rs.getMetaData().getScale(i + 1); - Integer scale = rsScale == 0 ? null : rsScale; - // We are guaranteed that the two column will have compatible types, - // as we checked that before. - if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(), SortOrder.getDefault(), precision, - scale, column.getMaxLength(), column.getScale())) { throw new SQLExceptionInfo.Builder( - SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString()) - .setMessage("value=" + column.getDataType().toStringLiteral(ptr, null)).build() - .buildException(); } - column.getDataType().coerceBytes(ptr, value, column.getDataType(), - precision, scale, SortOrder.getDefault(), - column.getMaxLength(), column.getScale(), column.getSortOrder(), - table.rowKeyOrderOptimizable()); - values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr); - } - // TODO onDupKeyBytes - setValues(values, pkSlotIndexes, columnIndexes, table, mutation, connection, useServerTimestamp, null); - rowCount++; - // Commit a batch if auto commit is true and we're at our batch size - if (isAutoCommit && rowCount % batchSize == 0) { - MutationState state = new MutationState(targetTableRef, mutation, 0, maxSize, connection); - connection.getMutationState().join(state); - connection.getMutationState().send(); - mutation.clear(); - } - } - // If auto commit is true, this last batch will be committed upon return - return new MutationState(targetTableRef, mutation, rowCount / batchSize * batchSize, maxSize, connection); - } + return UpsertCompiler.upsertSelect(context, targetTableRef, projector, iterator, columnIndexes, pkSlotIndexes, useServerTimestamp, true); } @Override @@ -216,40 +177,6 @@ public class PhoenixTableModify extends TableModify implements PhoenixRel { } } - private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, PhoenixConnection connection, boolean useServerTimestamp, byte[] onDupKeyBytes) { - Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length); - byte[][] pkValues = new byte[table.getPKColumns().size()][]; - // If the table uses salting, the first byte is the salting byte, set to an empty array - // here and we will fill in the byte later in PRowImpl. - if (table.getBucketNum() != null) { - pkValues[0] = new byte[] {0}; - } - Long rowTimestamp = null; // case when the table doesn't have a row timestamp column - RowTimestampColInfo rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp); - for (int i = 0; i < values.length; i++) { - byte[] value = values[i]; - PColumn column = table.getColumns().get(columnIndexes[i]); - if (SchemaUtil.isPKColumn(column)) { - pkValues[pkSlotIndex[i]] = value; - if (SchemaUtil.getPKPosition(table, column) == table.getRowTimestampColPos()) { - if (!useServerTimestamp) { - PColumn rowTimestampCol = table.getPKColumns().get(table.getRowTimestampColPos()); - rowTimestamp = PLong.INSTANCE.getCodec().decodeLong(value, 0, rowTimestampCol.getSortOrder()); - if (rowTimestamp < 0) { - throw new IllegalDataException("Value of a column designated as ROW_TIMESTAMP cannot be less than zero"); - } - rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp); - } - } - } else { - columnValues.put(column, value); - } - } - ImmutableBytesPtr ptr = new ImmutableBytesPtr(); - table.newKey(ptr, pkValues); - mutation.put(ptr, new RowMutationState(columnValues, connection.getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes)); - } - private static MutationPlan delete(final PhoenixConnection connection, final PhoenixTable targetTable, final TableRef targetTableRef, final QueryPlan queryPlan, final RowProjector projector) {
