Repository: phoenix
Updated Branches:
  refs/heads/calcite 405499047 -> 29da79fa6


PHOENIX-3351 Implement TODOs in PhoenixTableModify#upsert to allow writes to 
tenant specific tables-calcite changes(Rajeshbabu)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/29da79fa
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/29da79fa
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/29da79fa

Branch: refs/heads/calcite
Commit: 29da79fa62a27547aec1d2083144121a6d6fad05
Parents: 4054990
Author: Rajeshbabu Chintaguntla <[email protected]>
Authored: Fri Jan 27 22:06:37 2017 +0530
Committer: Rajeshbabu Chintaguntla <[email protected]>
Committed: Fri Jan 27 22:06:37 2017 +0530

----------------------------------------------------------------------
 .../phoenix/calcite/rel/PhoenixTableModify.java | 95 +++-----------------
 1 file changed, 11 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/29da79fa/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
index f13223f..052015e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
@@ -6,6 +6,8 @@ import java.sql.ParameterMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -29,6 +31,7 @@ import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.SequenceManager;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.StatementPlan;
+import org.apache.phoenix.compile.UpsertCompiler;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
@@ -112,8 +115,14 @@ public class PhoenixTableModify extends TableModify 
implements PhoenixRel {
             final int[] pkSlotIndexes = new int[mappedColumns.size()];
             for (int i = 0; i < columnIndexes.length; i++) {
                 PColumn column = mappedColumns.get(i);
+                int pkColPosition = 0;
                 if (SchemaUtil.isPKColumn(column)) {
-                    pkSlotIndexes[i] = column.getPosition();
+                    for(PColumn col: mappedColumns) {
+                        if(col.equals(column)) break;
+                        // Since first columns in the mappedColumns are pk 
columns only.
+                        pkColPosition++;
+                    }
+                    pkSlotIndexes[i] = pkColPosition;
                 }
                 columnIndexes[i] = column.getPosition();
             }
@@ -151,55 +160,7 @@ public class PhoenixTableModify extends TableModify 
implements PhoenixRel {
                 public MutationState execute() throws SQLException {
                     ResultIterator iterator = queryPlan.iterator();
                     // simplest version, no run-on-server, no pipelined update
-                    StatementContext childContext = queryPlan.getContext();
-                    ConnectionQueryServices services = 
connection.getQueryServices();
-                    int maxSize = 
services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,
-                            QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
-                    int batchSize = Math.min(connection.getMutateBatchSize(), 
maxSize);
-                    boolean isAutoCommit = connection.getAutoCommit();
-                    byte[][] values = new byte[columnIndexes.length][];
-                    int rowCount = 0;
-                    Map<ImmutableBytesPtr, RowMutationState> mutation = 
Maps.newHashMapWithExpectedSize(batchSize);
-                    PTable table = targetTableRef.getTable();
-                    try (ResultSet rs = new PhoenixResultSet(iterator, 
projector, childContext)) {
-                        ImmutableBytesWritable ptr = new 
ImmutableBytesWritable();
-                        while (rs.next()) {
-                            for (int i = 0; i < values.length; i++) {
-                                PColumn column = 
table.getColumns().get(columnIndexes[i]);
-                                byte[] bytes = rs.getBytes(i + 1);
-                                ptr.set(bytes == null ? 
ByteUtil.EMPTY_BYTE_ARRAY : bytes);
-                                Object value = rs.getObject(i + 1);
-                                int rsPrecision = 
rs.getMetaData().getPrecision(i + 1);
-                                Integer precision = rsPrecision == 0 ? null : 
rsPrecision;
-                                int rsScale = rs.getMetaData().getScale(i + 1);
-                                Integer scale = rsScale == 0 ? null : rsScale;
-                                // We are guaranteed that the two column will 
have compatible types,
-                                // as we checked that before.
-                                if 
(!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(), 
SortOrder.getDefault(), precision,
-                                        scale, column.getMaxLength(), 
column.getScale())) { throw new SQLExceptionInfo.Builder(
-                                        
SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
-                                        .setMessage("value=" + 
column.getDataType().toStringLiteral(ptr, null)).build()
-                                        .buildException(); }
-                                column.getDataType().coerceBytes(ptr, value, 
column.getDataType(), 
-                                        precision, scale, 
SortOrder.getDefault(), 
-                                        column.getMaxLength(), 
column.getScale(), column.getSortOrder(),
-                                        table.rowKeyOrderOptimizable());
-                                values[i] = 
ByteUtil.copyKeyBytesIfNecessary(ptr);
-                            }
-                            // TODO onDupKeyBytes
-                            setValues(values, pkSlotIndexes, columnIndexes, 
table, mutation, connection, useServerTimestamp, null);
-                            rowCount++;
-                            // Commit a batch if auto commit is true and we're 
at our batch size
-                            if (isAutoCommit && rowCount % batchSize == 0) {
-                                MutationState state = new 
MutationState(targetTableRef, mutation, 0, maxSize, connection);
-                                connection.getMutationState().join(state);
-                                connection.getMutationState().send();
-                                mutation.clear();
-                            }
-                        }
-                        // If auto commit is true, this last batch will be 
committed upon return
-                        return new MutationState(targetTableRef, mutation, 
rowCount / batchSize * batchSize, maxSize, connection);
-                    }
+                    return UpsertCompiler.upsertSelect(context, 
targetTableRef, projector, iterator, columnIndexes, pkSlotIndexes, 
useServerTimestamp, true);
                 }
 
                 @Override
@@ -216,40 +177,6 @@ public class PhoenixTableModify extends TableModify 
implements PhoenixRel {
         }
     }
     
-    private static void setValues(byte[][] values, int[] pkSlotIndex, int[] 
columnIndexes, PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, 
PhoenixConnection connection, boolean useServerTimestamp, byte[] onDupKeyBytes) 
{
-        Map<PColumn,byte[]> columnValues = 
Maps.newHashMapWithExpectedSize(columnIndexes.length);
-        byte[][] pkValues = new byte[table.getPKColumns().size()][];
-        // If the table uses salting, the first byte is the salting byte, set 
to an empty array
-        // here and we will fill in the byte later in PRowImpl.
-        if (table.getBucketNum() != null) {
-            pkValues[0] = new byte[] {0};
-        }
-        Long rowTimestamp = null; // case when the table doesn't have a row 
timestamp column
-        RowTimestampColInfo rowTsColInfo = new 
RowTimestampColInfo(useServerTimestamp, rowTimestamp);
-        for (int i = 0; i < values.length; i++) {
-            byte[] value = values[i];
-            PColumn column = table.getColumns().get(columnIndexes[i]);
-            if (SchemaUtil.isPKColumn(column)) {
-                pkValues[pkSlotIndex[i]] = value;
-                if (SchemaUtil.getPKPosition(table, column) == 
table.getRowTimestampColPos()) {
-                    if (!useServerTimestamp) {
-                        PColumn rowTimestampCol = 
table.getPKColumns().get(table.getRowTimestampColPos());
-                        rowTimestamp = 
PLong.INSTANCE.getCodec().decodeLong(value, 0, rowTimestampCol.getSortOrder());
-                        if (rowTimestamp < 0) {
-                            throw new IllegalDataException("Value of a column 
designated as ROW_TIMESTAMP cannot be less than zero");
-                        }
-                        rowTsColInfo = new 
RowTimestampColInfo(useServerTimestamp, rowTimestamp);
-                    } 
-                }
-            } else {
-                columnValues.put(column, value);
-            }
-        }
-        ImmutableBytesPtr ptr = new ImmutableBytesPtr();
-        table.newKey(ptr, pkValues);
-        mutation.put(ptr, new RowMutationState(columnValues, 
connection.getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes));
-    }
-
     private static MutationPlan delete(final PhoenixConnection connection,
             final PhoenixTable targetTable, final TableRef targetTableRef,
             final QueryPlan queryPlan, final RowProjector projector) {

Reply via email to