PHOENIX-3351 Implement TODOs in PhoenixTableModify#upsert to allow writes to tenant specific tables(Rajeshbabu)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d971192c Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d971192c Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d971192c Branch: refs/heads/encodecolumns2 Commit: d971192c585eef3079f1e348e725df5384e45108 Parents: badb9b4 Author: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Authored: Mon Jan 23 15:07:49 2017 +0530 Committer: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Committed: Mon Jan 23 15:07:49 2017 +0530 ---------------------------------------------------------------------- .../apache/phoenix/compile/UpsertCompiler.java | 44 ++++++++++++++------ 1 file changed, 32 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/d971192c/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 8837445..32ce6ad 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -98,6 +98,7 @@ import org.apache.phoenix.schema.TypeMismatchException; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.schema.types.PSmallint; import org.apache.phoenix.schema.types.PTimestamp; import org.apache.phoenix.schema.types.PUnsignedLong; import org.apache.phoenix.schema.types.PVarbinary; @@ -116,7 +117,7 @@ public class UpsertCompiler { private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr, RowMutationState> mutation, PhoenixStatement statement, boolean useServerTimestamp, IndexMaintainer maintainer, - byte[][] viewConstants, byte[] onDupKeyBytes) throws SQLException { + byte[][] viewConstants, byte[] onDupKeyBytes, int numSplColumns) throws SQLException { Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length); byte[][] pkValues = new byte[table.getPKColumns().size()][]; // If the table uses salting, the first byte is the salting byte, set to an empty array @@ -124,10 +125,13 @@ public class UpsertCompiler { if (table.getBucketNum() != null) { pkValues[0] = new byte[] {0}; } + for(int i = 0; i < numSplColumns; i++) { + pkValues[i] = values[i]; + } Long rowTimestamp = null; // case when the table doesn't have a row timestamp column RowTimestampColInfo rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp); - for (int i = 0; i < values.length; i++) { - byte[] value = values[i]; + for (int i = 0, j = numSplColumns; j < values.length; j++, i++) { + byte[] value = values[j]; PColumn column = table.getColumns().get(columnIndexes[i]); if (SchemaUtil.isPKColumn(column)) { pkValues[pkSlotIndex[i]] = value; @@ -163,8 +167,8 @@ public class UpsertCompiler { mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes)); } - private static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector, - ResultIterator iterator, int[] columnIndexes, int[] pkSlotIndexes, boolean useServerTimestamp) throws SQLException { + public static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector, + ResultIterator iterator, int[] columnIndexes, int[] pkSlotIndexes, boolean useServerTimestamp, boolean prefixSysColValues) throws SQLException { PhoenixStatement statement = childContext.getStatement(); PhoenixConnection connection = statement.getConnection(); ConnectionQueryServices services = connection.getQueryServices(); @@ -172,7 +176,23 @@ public class UpsertCompiler { QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); int batchSize = Math.min(connection.getMutateBatchSize(), maxSize); boolean isAutoCommit = connection.getAutoCommit(); - byte[][] values = new byte[columnIndexes.length][]; + int numSplColumns = + (tableRef.getTable().isMultiTenant() ? 1 : 0) + + (tableRef.getTable().getViewIndexId() != null ? 1 : 0); + byte[][] values = new byte[columnIndexes.length + numSplColumns][]; + if(prefixSysColValues) { + int i = 0; + if(tableRef.getTable().isMultiTenant()) { + values[i++] = connection.getTenantId().getBytes(); + } + if(tableRef.getTable().getViewIndexId() != null) { + values[i++] = PSmallint.INSTANCE.toBytes(tableRef.getTable().getViewIndexId()); + } + + for(int k = 0; k < pkSlotIndexes.length; k++) { + pkSlotIndexes[k] += (i + (tableRef.getTable().getBucketNum() != null ? 1 : 0)); + } + } int rowCount = 0; Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize); PTable table = tableRef.getTable(); @@ -192,7 +212,7 @@ public class UpsertCompiler { try (ResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) { ImmutableBytesWritable ptr = new ImmutableBytesWritable(); while (rs.next()) { - for (int i = 0; i < values.length; i++) { + for (int i = 0, j = numSplColumns; j < values.length; j++, i++) { PColumn column = table.getColumns().get(columnIndexes[i]); byte[] bytes = rs.getBytes(i + 1); ptr.set(bytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : bytes); @@ -212,9 +232,9 @@ public class UpsertCompiler { precision, scale, SortOrder.getDefault(), column.getMaxLength(), column.getScale(), column.getSortOrder(), table.rowKeyOrderOptimizable()); - values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr); + values[j] = ByteUtil.copyKeyBytesIfNecessary(ptr); } - setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, null); + setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, null, numSplColumns); rowCount++; // Commit a batch if auto commit is true and we're at our batch size if (isAutoCommit && rowCount % batchSize == 0) { @@ -256,7 +276,7 @@ public class UpsertCompiler { StatementContext childContext = new StatementContext(statement, false); // Clone the row projector as it's not thread safe and would be used simultaneously by // multiple threads otherwise. - MutationState state = upsertSelect(childContext, tableRef, projector.cloneIfNecessary(), iterator, columnIndexes, pkSlotIndexes, useSeverTimestamp); + MutationState state = upsertSelect(childContext, tableRef, projector.cloneIfNecessary(), iterator, columnIndexes, pkSlotIndexes, useSeverTimestamp, false); return state; } @@ -798,7 +818,7 @@ public class UpsertCompiler { public MutationState execute() throws SQLException { ResultIterator iterator = queryPlan.iterator(); if (parallelIteratorFactory == null) { - return upsertSelect(new StatementContext(statement), tableRef, projector, iterator, columnIndexes, pkSlotIndexes, useServerTimestamp); + return upsertSelect(new StatementContext(statement), tableRef, projector, iterator, columnIndexes, pkSlotIndexes, useServerTimestamp, false); } try { parallelIteratorFactory.setRowProjector(projector); @@ -1043,7 +1063,7 @@ public class UpsertCompiler { indexMaintainer = table.getIndexMaintainer(parentTable, connection); viewConstants = IndexUtil.getViewConstants(parentTable); } - setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes); + setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes, 0); return new MutationState(tableRef, mutation, 0, maxSize, connection); }