PHOENIX-3351 Implement TODOs in PhoenixTableModify#upsert to allow writes to 
tenant specific tables(Rajeshbabu)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d971192c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d971192c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d971192c

Branch: refs/heads/encodecolumns2
Commit: d971192c585eef3079f1e348e725df5384e45108
Parents: badb9b4
Author: Rajeshbabu Chintaguntla <rajeshb...@apache.org>
Authored: Mon Jan 23 15:07:49 2017 +0530
Committer: Rajeshbabu Chintaguntla <rajeshb...@apache.org>
Committed: Mon Jan 23 15:07:49 2017 +0530

----------------------------------------------------------------------
 .../apache/phoenix/compile/UpsertCompiler.java  | 44 ++++++++++++++------
 1 file changed, 32 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d971192c/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 8837445..32ce6ad 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -98,6 +98,7 @@ import org.apache.phoenix.schema.TypeMismatchException;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PSmallint;
 import org.apache.phoenix.schema.types.PTimestamp;
 import org.apache.phoenix.schema.types.PUnsignedLong;
 import org.apache.phoenix.schema.types.PVarbinary;
@@ -116,7 +117,7 @@ public class UpsertCompiler {
     private static void setValues(byte[][] values, int[] pkSlotIndex, int[] 
columnIndexes,
             PTable table, Map<ImmutableBytesPtr, RowMutationState> mutation,
             PhoenixStatement statement, boolean useServerTimestamp, 
IndexMaintainer maintainer,
-            byte[][] viewConstants, byte[] onDupKeyBytes) throws SQLException {
+            byte[][] viewConstants, byte[] onDupKeyBytes, int numSplColumns) 
throws SQLException {
         Map<PColumn,byte[]> columnValues = 
Maps.newHashMapWithExpectedSize(columnIndexes.length);
         byte[][] pkValues = new byte[table.getPKColumns().size()][];
         // If the table uses salting, the first byte is the salting byte, set 
to an empty array
@@ -124,10 +125,13 @@ public class UpsertCompiler {
         if (table.getBucketNum() != null) {
             pkValues[0] = new byte[] {0};
         }
+        for(int i = 0; i < numSplColumns; i++) {
+            pkValues[i] = values[i];
+        }
         Long rowTimestamp = null; // case when the table doesn't have a row 
timestamp column
         RowTimestampColInfo rowTsColInfo = new 
RowTimestampColInfo(useServerTimestamp, rowTimestamp);
-        for (int i = 0; i < values.length; i++) {
-            byte[] value = values[i];
+        for (int i = 0, j = numSplColumns; j < values.length; j++, i++) {
+            byte[] value = values[j];
             PColumn column = table.getColumns().get(columnIndexes[i]);
             if (SchemaUtil.isPKColumn(column)) {
                 pkValues[pkSlotIndex[i]] = value;
@@ -163,8 +167,8 @@ public class UpsertCompiler {
         mutation.put(ptr, new RowMutationState(columnValues, 
statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, 
onDupKeyBytes));
     }
     
-    private static MutationState upsertSelect(StatementContext childContext, 
TableRef tableRef, RowProjector projector,
-            ResultIterator iterator, int[] columnIndexes, int[] pkSlotIndexes, 
boolean useServerTimestamp) throws SQLException {
+    public static MutationState upsertSelect(StatementContext childContext, 
TableRef tableRef, RowProjector projector,
+            ResultIterator iterator, int[] columnIndexes, int[] pkSlotIndexes, 
boolean useServerTimestamp, boolean prefixSysColValues) throws SQLException {
         PhoenixStatement statement = childContext.getStatement();
         PhoenixConnection connection = statement.getConnection();
         ConnectionQueryServices services = connection.getQueryServices();
@@ -172,7 +176,23 @@ public class UpsertCompiler {
                 QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
         int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
         boolean isAutoCommit = connection.getAutoCommit();
-        byte[][] values = new byte[columnIndexes.length][];
+        int numSplColumns =
+                (tableRef.getTable().isMultiTenant() ? 1 : 0)
+                        + (tableRef.getTable().getViewIndexId() != null ? 1 : 
0);
+        byte[][] values = new byte[columnIndexes.length + numSplColumns][];
+        if(prefixSysColValues) {
+            int i = 0;
+            if(tableRef.getTable().isMultiTenant()) {
+                values[i++] = connection.getTenantId().getBytes();
+            }
+            if(tableRef.getTable().getViewIndexId() != null) {
+                values[i++] = 
PSmallint.INSTANCE.toBytes(tableRef.getTable().getViewIndexId());
+            }
+            
+            for(int k = 0; k <  pkSlotIndexes.length; k++) {
+                pkSlotIndexes[k] += (i + (tableRef.getTable().getBucketNum() 
!= null ? 1 : 0));
+            }
+        }
         int rowCount = 0;
         Map<ImmutableBytesPtr, RowMutationState> mutation = 
Maps.newHashMapWithExpectedSize(batchSize);
         PTable table = tableRef.getTable();
@@ -192,7 +212,7 @@ public class UpsertCompiler {
         try (ResultSet rs = new PhoenixResultSet(iterator, projector, 
childContext)) {
             ImmutableBytesWritable ptr = new ImmutableBytesWritable();
             while (rs.next()) {
-                for (int i = 0; i < values.length; i++) {
+                for (int i = 0, j = numSplColumns; j < values.length; j++, 
i++) {
                     PColumn column = table.getColumns().get(columnIndexes[i]);
                     byte[] bytes = rs.getBytes(i + 1);
                     ptr.set(bytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : bytes);
@@ -212,9 +232,9 @@ public class UpsertCompiler {
                             precision, scale, SortOrder.getDefault(), 
                             column.getMaxLength(), column.getScale(), 
column.getSortOrder(),
                             table.rowKeyOrderOptimizable());
-                    values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                    values[j] = ByteUtil.copyKeyBytesIfNecessary(ptr);
                 }
-                setValues(values, pkSlotIndexes, columnIndexes, table, 
mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, null);
+                setValues(values, pkSlotIndexes, columnIndexes, table, 
mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, null, 
numSplColumns);
                 rowCount++;
                 // Commit a batch if auto commit is true and we're at our 
batch size
                 if (isAutoCommit && rowCount % batchSize == 0) {
@@ -256,7 +276,7 @@ public class UpsertCompiler {
             StatementContext childContext = new StatementContext(statement, 
false);
             // Clone the row projector as it's not thread safe and would be 
used simultaneously by
             // multiple threads otherwise.
-            MutationState state = upsertSelect(childContext, tableRef, 
projector.cloneIfNecessary(), iterator, columnIndexes, pkSlotIndexes, 
useSeverTimestamp);
+            MutationState state = upsertSelect(childContext, tableRef, 
projector.cloneIfNecessary(), iterator, columnIndexes, pkSlotIndexes, 
useSeverTimestamp, false);
             return state;
         }
         
@@ -798,7 +818,7 @@ public class UpsertCompiler {
                 public MutationState execute() throws SQLException {
                     ResultIterator iterator = queryPlan.iterator();
                     if (parallelIteratorFactory == null) {
-                        return upsertSelect(new StatementContext(statement), 
tableRef, projector, iterator, columnIndexes, pkSlotIndexes, 
useServerTimestamp);
+                        return upsertSelect(new StatementContext(statement), 
tableRef, projector, iterator, columnIndexes, pkSlotIndexes, 
useServerTimestamp, false);
                     }
                     try {
                         parallelIteratorFactory.setRowProjector(projector);
@@ -1043,7 +1063,7 @@ public class UpsertCompiler {
                     indexMaintainer = table.getIndexMaintainer(parentTable, 
connection);
                     viewConstants = IndexUtil.getViewConstants(parentTable);
                 }
-                setValues(values, pkSlotIndexes, columnIndexes, table, 
mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, 
onDupKeyBytes);
+                setValues(values, pkSlotIndexes, columnIndexes, table, 
mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, 
onDupKeyBytes, 0);
                 return new MutationState(tableRef, mutation, 0, maxSize, 
connection);
             }
 

Reply via email to