PHOENIX-4342 - Surface QueryPlan in MutationPlan

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1d8a6bc3
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1d8a6bc3
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1d8a6bc3

Branch: refs/heads/system-catalog
Commit: 1d8a6bc3a6a277d9e3201066b753fa9fd7018545
Parents: 2a8e1c7
Author: Geoffrey Jacoby <gjac...@apache.org>
Authored: Thu Nov 2 13:41:02 2017 -0700
Committer: Geoffrey Jacoby <gjac...@apache.org>
Committed: Mon Nov 13 11:47:50 2017 -0800

----------------------------------------------------------------------
 .../phoenix/compile/BaseMutationPlan.java       |   5 +
 .../phoenix/compile/DelegateMutationPlan.java   |   5 +
 .../apache/phoenix/compile/DeleteCompiler.java  | 545 ++++++++-------
 .../apache/phoenix/compile/MutationPlan.java    |   5 +-
 .../apache/phoenix/compile/UpsertCompiler.java  | 675 +++++++++++--------
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   9 +-
 6 files changed, 733 insertions(+), 511 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d8a6bc3/phoenix-core/src/main/java/org/apache/phoenix/compile/BaseMutationPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/BaseMutationPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/BaseMutationPlan.java
index 0e45682..60eb59a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/BaseMutationPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/BaseMutationPlan.java
@@ -79,4 +79,9 @@ public abstract class BaseMutationPlan implements 
MutationPlan {
         return 0l;
     }
 
+    @Override
+    public QueryPlan getQueryPlan() {
+        return null;
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d8a6bc3/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java
index 343ec32..90eef61 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java
@@ -42,6 +42,11 @@ public class DelegateMutationPlan implements MutationPlan {
     }
 
     @Override
+    public QueryPlan getQueryPlan() {
+        return plan.getQueryPlan();
+    }
+
+    @Override
     public ParameterMetaData getParameterMetaData() {
         return plan.getParameterMetaData();
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d8a6bc3/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index f038cda..8d9a5b6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -303,14 +303,16 @@ public class DeleteCompiler {
         return Collections.emptyList();
     }
     
-    private class MultiDeleteMutationPlan implements MutationPlan {
+    private class MultiRowDeleteMutationPlan implements MutationPlan {
         private final List<MutationPlan> plans;
         private final MutationPlan firstPlan;
-        
-        public MultiDeleteMutationPlan(@NotNull List<MutationPlan> plans) {
+        private final QueryPlan dataPlan;
+
+        public MultiRowDeleteMutationPlan(QueryPlan dataPlan, @NotNull 
List<MutationPlan> plans) {
             Preconditions.checkArgument(!plans.isEmpty());
             this.plans = plans;
             this.firstPlan = plans.get(0);
+            this.dataPlan = dataPlan;
         }
         
         @Override
@@ -348,8 +350,8 @@ public class DeleteCompiler {
             return firstPlan.getSourceRefs();
         }
 
-               @Override
-               public Operation getOperation() {
+                   @Override
+                   public Operation getOperation() {
                        return operation;
                }
 
@@ -401,6 +403,11 @@ public class DeleteCompiler {
             }
             return estInfoTimestamp;
         }
+
+        @Override
+        public QueryPlan getQueryPlan() {
+            return dataPlan;
+        }
     }
 
     public MutationPlan compile(DeleteStatement delete) throws SQLException {
@@ -548,69 +555,9 @@ public class DeleteCompiler {
             List<MutationPlan> mutationPlans = 
Lists.newArrayListWithExpectedSize(queryPlans.size());
             for (final QueryPlan plan : queryPlans) {
                 final StatementContext context = plan.getContext();
-                mutationPlans.add(new MutationPlan() {
-    
-                    @Override
-                    public ParameterMetaData getParameterMetaData() {
-                        return context.getBindManager().getParameterMetaData();
-                    }
-    
-                    @Override
-                    public MutationState execute() throws SQLException {
-                        // We have a point lookup, so we know we have a simple 
set of fully qualified
-                        // keys for our ranges
-                        ScanRanges ranges = context.getScanRanges();
-                        Iterator<KeyRange> iterator = 
ranges.getPointLookupKeyIterator(); 
-                        Map<ImmutableBytesPtr,RowMutationState> mutation = 
Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount());
-                        while (iterator.hasNext()) {
-                            mutation.put(new 
ImmutableBytesPtr(iterator.next().getLowerRange()), new 
RowMutationState(PRow.DELETE_MARKER, 
statement.getConnection().getStatementExecutionCounter(), 
NULL_ROWTIMESTAMP_INFO, null));
-                        }
-                        return new MutationState(plan.getTableRef(), mutation, 
0, maxSize, maxSizeBytes, connection);
-                    }
-    
-                    @Override
-                    public ExplainPlan getExplainPlan() throws SQLException {
-                        return new 
ExplainPlan(Collections.singletonList("DELETE SINGLE ROW"));
-                    }
-    
-                    @Override
-                    public StatementContext getContext() {
-                        return context;
-                    }
-
-                    @Override
-                    public TableRef getTargetRef() {
-                        return dataPlan.getTableRef();
-                    }
-
-                    @Override
-                    public Set<TableRef> getSourceRefs() {
-                        // Don't include the target
-                        return Collections.emptySet();
-                    }
-
-                       @Override
-                       public Operation getOperation() {
-                               return operation;
-                       }
-
-                    @Override
-                    public Long getEstimatedRowsToScan() throws SQLException {
-                        return 0l;
-                    }
-
-                    @Override
-                    public Long getEstimatedBytesToScan() throws SQLException {
-                        return 0l;
-                    }
-
-                    @Override
-                    public Long getEstimateInfoTimestamp() throws SQLException 
{
-                        return 0l;
-                    }
-                });
+                mutationPlans.add(new SingleRowDeleteMutationPlan(plan, 
connection, maxSize, maxSizeBytes));
             }
-            return new MultiDeleteMutationPlan(mutationPlans);
+            return new MultiRowDeleteMutationPlan(dataPlan, mutationPlans);
         } else if (runOnServer) {
             // TODO: better abstraction
             final StatementContext context = dataPlan.getContext();
@@ -629,91 +576,7 @@ public class DeleteCompiler {
             final RowProjector projector = projectorToBe;
             final QueryPlan aggPlan = new AggregatePlan(context, select, 
dataPlan.getTableRef(), projector, null, null,
                     OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, 
null);
-            return new MutationPlan() {
-                        @Override
-                        public ParameterMetaData getParameterMetaData() {
-                            return 
context.getBindManager().getParameterMetaData();
-                        }
-        
-                        @Override
-                        public StatementContext getContext() {
-                            return context;
-                        }
-        
-                        @Override
-                        public TableRef getTargetRef() {
-                            return dataPlan.getTableRef();
-                        }
-    
-                        @Override
-                        public Set<TableRef> getSourceRefs() {
-                            return dataPlan.getSourceRefs();
-                        }
-    
-                               @Override
-                               public Operation getOperation() {
-                                       return operation;
-                               }
-    
-                        @Override
-                        public MutationState execute() throws SQLException {
-                            // TODO: share this block of code with UPSERT 
SELECT
-                            ImmutableBytesWritable ptr = context.getTempPtr();
-                            PTable table = dataPlan.getTableRef().getTable();
-                            table.getIndexMaintainers(ptr, 
context.getConnection());
-                            byte[] txState = table.isTransactional() ? 
connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
-                            ServerCache cache = null;
-                            try {
-                                if (ptr.getLength() > 0) {
-                                    byte[] uuidValue = 
ServerCacheClient.generateId();
-                                    
context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                    
context.getScan().setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
-                                    
context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
-                                }
-                                ResultIterator iterator = aggPlan.iterator();
-                                try {
-                                    Tuple row = iterator.next();
-                                    final long mutationCount = 
(Long)projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
-                                    return new MutationState(maxSize, 
maxSizeBytes, connection) {
-                                        @Override
-                                        public long getUpdateCount() {
-                                            return mutationCount;
-                                        }
-                                    };
-                                } finally {
-                                    iterator.close();
-                                }
-                            } finally {
-                                if (cache != null) {
-                                    cache.close();
-                                }
-                            }
-                        }
-        
-                        @Override
-                        public ExplainPlan getExplainPlan() throws 
SQLException {
-                            List<String> queryPlanSteps =  
aggPlan.getExplainPlan().getPlanSteps();
-                            List<String> planSteps = 
Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
-                            planSteps.add("DELETE ROWS");
-                            planSteps.addAll(queryPlanSteps);
-                            return new ExplainPlan(planSteps);
-                        }
-    
-                        @Override
-                        public Long getEstimatedRowsToScan() throws 
SQLException {
-                            return aggPlan.getEstimatedRowsToScan();
-                        }
-    
-                        @Override
-                        public Long getEstimatedBytesToScan() throws 
SQLException {
-                            return aggPlan.getEstimatedBytesToScan();
-                        }
-    
-                        @Override
-                        public Long getEstimateInfoTimestamp() throws 
SQLException {
-                            return aggPlan.getEstimateInfoTimestamp();
-                        }
-                    };
+            return new ServerSelectDeleteMutationPlan(dataPlan, connection, 
aggPlan, projector, maxSize, maxSizeBytes);
         } else {
             final DeletingParallelIteratorFactory parallelIteratorFactory = 
parallelIteratorFactoryToBe;
             List<PColumn> adjustedProjectedColumns = 
Lists.newArrayListWithExpectedSize(projectedColumns.size());
@@ -749,90 +612,322 @@ public class DeleteCompiler {
             if 
(!bestPlan.getTableRef().getTable().equals(targetTableRef.getTable())) {
                 otherTableRefs.add(projectedTableRef);
             }
-            final StatementContext context = bestPlan.getContext();
-            return new MutationPlan() {
-                @Override
-                public ParameterMetaData getParameterMetaData() {
-                    return context.getBindManager().getParameterMetaData();
-                }
+            return new ClientSelectDeleteMutationPlan(targetTableRef, 
dataPlan, bestPlan, hasPreOrPostProcessing,
+                    parallelIteratorFactory, otherTableRefs, 
projectedTableRef, maxSize, maxSizeBytes, connection);
+        }
+    }
 
-                @Override
-                public StatementContext getContext() {
-                    return context;
-                }
+    private class SingleRowDeleteMutationPlan implements MutationPlan {
 
-                @Override
-                public TableRef getTargetRef() {
-                    return targetTableRef;
-                }
+        private final QueryPlan dataPlan;
+        private final PhoenixConnection connection;
+        private final int maxSize;
+        private final StatementContext context;
+        private final int maxSizeBytes;
 
-                @Override
-                public Set<TableRef> getSourceRefs() {
-                    return dataPlan.getSourceRefs();
-                }
+        public SingleRowDeleteMutationPlan(QueryPlan dataPlan, 
PhoenixConnection connection, int maxSize, int maxSizeBytes) {
+            this.dataPlan = dataPlan;
+            this.connection = connection;
+            this.maxSize = maxSize;
+            this.context = dataPlan.getContext();
+            this.maxSizeBytes = maxSizeBytes;
+        }
+
+        @Override
+        public ParameterMetaData getParameterMetaData() {
+            return context.getBindManager().getParameterMetaData();
+        }
 
-                       @Override
-                       public Operation getOperation() {
-                               return operation;
-                       }
-
-                @Override
-                public MutationState execute() throws SQLException {
-                    ResultIterator iterator = bestPlan.iterator();
-                    try {
-                        if (!hasPreOrPostProcessing) {
-                            Tuple tuple;
-                            long totalRowCount = 0;
-                            if (parallelIteratorFactory != null) {
-                                parallelIteratorFactory.setQueryPlan(bestPlan);
-                                
parallelIteratorFactory.setOtherTableRefs(otherTableRefs);
-                                
parallelIteratorFactory.setProjectedTableRef(projectedTableRef);
-                            }
-                            while ((tuple=iterator.next()) != null) {// Runs 
query
-                                Cell kv = tuple.getValue(0);
-                                totalRowCount += 
PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), 
SortOrder.getDefault());
-                            }
-                            // Return total number of rows that have been 
deleted from the table. In the case of auto commit being off
-                            // the mutations will all be in the mutation state 
of the current connection. We need to divide by the
-                            // total number of tables we updated as otherwise 
the client will get an unexpected result
-                            MutationState state = new MutationState(maxSize, 
maxSizeBytes, connection, totalRowCount / 
((bestPlan.getTableRef().getTable().getIndexType() == IndexType.LOCAL && 
!otherTableRefs.isEmpty() ? 0 : 1) + otherTableRefs.size()));
-
-                            // set the read metrics accumulated in the parent 
context so that it can be published when the mutations are committed.
-                            
state.setReadMetricQueue(context.getReadMetricsQueue());
-
-                            return state;
-                        } else {
-                            return deleteRows(context, iterator, bestPlan, 
projectedTableRef, otherTableRefs);
+        @Override
+        public MutationState execute() throws SQLException {
+            // We have a point lookup, so we know we have a simple set of 
fully qualified
+            // keys for our ranges
+            ScanRanges ranges = context.getScanRanges();
+            Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator();
+            Map<ImmutableBytesPtr,RowMutationState> mutation = 
Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount());
+            while (iterator.hasNext()) {
+                mutation.put(new 
ImmutableBytesPtr(iterator.next().getLowerRange()),
+                        new RowMutationState(PRow.DELETE_MARKER,
+                                
statement.getConnection().getStatementExecutionCounter(), 
NULL_ROWTIMESTAMP_INFO, null));
+            }
+            return new MutationState(dataPlan.getTableRef(), mutation, 0, 
maxSize, maxSizeBytes, connection);
+        }
+
+        @Override
+        public ExplainPlan getExplainPlan() throws SQLException {
+            return new ExplainPlan(Collections.singletonList("DELETE SINGLE 
ROW"));
+        }
+
+        @Override
+        public QueryPlan getQueryPlan() {
+            return dataPlan;
+        }
+
+        @Override
+        public StatementContext getContext() {
+            return context;
+        }
+
+        @Override
+        public TableRef getTargetRef() {
+            return dataPlan.getTableRef();
+        }
+
+        @Override
+        public Set<TableRef> getSourceRefs() {
+            // Don't include the target
+            return Collections.emptySet();
+        }
+
+        @Override
+        public Operation getOperation() {
+          return operation;
+        }
+
+        @Override
+        public Long getEstimatedRowsToScan() throws SQLException {
+            return 0l;
+        }
+
+        @Override
+        public Long getEstimatedBytesToScan() throws SQLException {
+            return 0l;
+        }
+
+        @Override
+        public Long getEstimateInfoTimestamp() throws SQLException {
+            return 0l;
+        }
+    }
+
+    private class ServerSelectDeleteMutationPlan implements MutationPlan {
+        private final StatementContext context;
+        private final QueryPlan dataPlan;
+        private final PhoenixConnection connection;
+        private final QueryPlan aggPlan;
+        private final RowProjector projector;
+        private final int maxSize;
+        private final int maxSizeBytes;
+
+        public ServerSelectDeleteMutationPlan(QueryPlan dataPlan, 
PhoenixConnection connection, QueryPlan aggPlan,
+                                              RowProjector projector, int 
maxSize, int maxSizeBytes) {
+            this.context = dataPlan.getContext();
+            this.dataPlan = dataPlan;
+            this.connection = connection;
+            this.aggPlan = aggPlan;
+            this.projector = projector;
+            this.maxSize = maxSize;
+            this.maxSizeBytes = maxSizeBytes;
+        }
+
+        @Override
+        public ParameterMetaData getParameterMetaData() {
+            return context.getBindManager().getParameterMetaData();
+        }
+
+        @Override
+        public StatementContext getContext() {
+            return context;
+        }
+
+        @Override
+        public TableRef getTargetRef() {
+            return dataPlan.getTableRef();
+        }
+
+        @Override
+        public Set<TableRef> getSourceRefs() {
+            return dataPlan.getSourceRefs();
+        }
+
+        @Override
+        public Operation getOperation() {
+          return operation;
+        }
+
+        @Override
+        public MutationState execute() throws SQLException {
+            // TODO: share this block of code with UPSERT SELECT
+            ImmutableBytesWritable ptr = context.getTempPtr();
+            PTable table = dataPlan.getTableRef().getTable();
+            table.getIndexMaintainers(ptr, context.getConnection());
+            byte[] txState = table.isTransactional() ? 
connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
+            ServerCache cache = null;
+            try {
+                if (ptr.getLength() > 0) {
+                    byte[] uuidValue = ServerCacheClient.generateId();
+                    
context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+                    
context.getScan().setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
+                    
context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+                }
+                ResultIterator iterator = aggPlan.iterator();
+                try {
+                    Tuple row = iterator.next();
+                    final long mutationCount = (Long) 
projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
+                    return new MutationState(maxSize, maxSizeBytes, 
connection) {
+                        @Override
+                        public long getUpdateCount() {
+                            return mutationCount;
                         }
-                    } finally {
-                        iterator.close();
-                    }
+                    };
+                } finally {
+                    iterator.close();
                 }
-
-                @Override
-                public ExplainPlan getExplainPlan() throws SQLException {
-                    List<String> queryPlanSteps =  
bestPlan.getExplainPlan().getPlanSteps();
-                    List<String> planSteps = 
Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
-                    planSteps.add("DELETE ROWS");
-                    planSteps.addAll(queryPlanSteps);
-                    return new ExplainPlan(planSteps);
+            } finally {
+                if (cache != null) {
+                    cache.close();
                 }
+            }
+        }
 
-                @Override
-                public Long getEstimatedRowsToScan() throws SQLException {
-                    return bestPlan.getEstimatedRowsToScan();
-                }
+        @Override
+        public ExplainPlan getExplainPlan() throws SQLException {
+            List<String> queryPlanSteps =  
aggPlan.getExplainPlan().getPlanSteps();
+            List<String> planSteps = 
Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+            planSteps.add("DELETE ROWS");
+            planSteps.addAll(queryPlanSteps);
+            return new ExplainPlan(planSteps);
+        }
 
-                @Override
-                public Long getEstimatedBytesToScan() throws SQLException {
-                    return bestPlan.getEstimatedBytesToScan();
-                }
+        @Override
+        public Long getEstimatedRowsToScan() throws SQLException {
+            return aggPlan.getEstimatedRowsToScan();
+        }
 
-                @Override
-                public Long getEstimateInfoTimestamp() throws SQLException {
-                    return bestPlan.getEstimateInfoTimestamp();
+        @Override
+        public Long getEstimatedBytesToScan() throws SQLException {
+            return aggPlan.getEstimatedBytesToScan();
+        }
+
+        @Override
+        public Long getEstimateInfoTimestamp() throws SQLException {
+            return aggPlan.getEstimateInfoTimestamp();
+        }
+
+        @Override
+        public QueryPlan getQueryPlan() {
+            return aggPlan;
+        }
+    }
+
+    private class ClientSelectDeleteMutationPlan implements MutationPlan {
+        private final StatementContext context;
+        private final TableRef targetTableRef;
+        private final QueryPlan dataPlan;
+        private final QueryPlan bestPlan;
+        private final boolean hasPreOrPostProcessing;
+        private final DeletingParallelIteratorFactory parallelIteratorFactory;
+        private final List<TableRef> otherTableRefs;
+        private final TableRef projectedTableRef;
+        private final int maxSize;
+        private final int maxSizeBytes;
+        private final PhoenixConnection connection;
+
+        public ClientSelectDeleteMutationPlan(TableRef targetTableRef, 
QueryPlan dataPlan, QueryPlan bestPlan,
+                                              boolean hasPreOrPostProcessing,
+                                              DeletingParallelIteratorFactory 
parallelIteratorFactory,
+                                              List<TableRef> otherTableRefs, 
TableRef projectedTableRef, int maxSize,
+                                              int maxSizeBytes, 
PhoenixConnection connection) {
+            this.context = bestPlan.getContext();
+            this.targetTableRef = targetTableRef;
+            this.dataPlan = dataPlan;
+            this.bestPlan = bestPlan;
+            this.hasPreOrPostProcessing = hasPreOrPostProcessing;
+            this.parallelIteratorFactory = parallelIteratorFactory;
+            this.otherTableRefs = otherTableRefs;
+            this.projectedTableRef = projectedTableRef;
+            this.maxSize = maxSize;
+            this.maxSizeBytes = maxSizeBytes;
+            this.connection = connection;
+        }
+
+        @Override
+        public ParameterMetaData getParameterMetaData() {
+            return context.getBindManager().getParameterMetaData();
+        }
+
+        @Override
+        public StatementContext getContext() {
+            return context;
+        }
+
+        @Override
+        public TableRef getTargetRef() {
+            return targetTableRef;
+        }
+
+        @Override
+        public Set<TableRef> getSourceRefs() {
+            return dataPlan.getSourceRefs();
+        }
+
+        @Override
+        public Operation getOperation() {
+          return operation;
+        }
+
+        @Override
+        public MutationState execute() throws SQLException {
+            ResultIterator iterator = bestPlan.iterator();
+            try {
+                if (!hasPreOrPostProcessing) {
+                    Tuple tuple;
+                    long totalRowCount = 0;
+                    if (parallelIteratorFactory != null) {
+                        parallelIteratorFactory.setQueryPlan(bestPlan);
+                        
parallelIteratorFactory.setOtherTableRefs(otherTableRefs);
+                        
parallelIteratorFactory.setProjectedTableRef(projectedTableRef);
+                    }
+                    while ((tuple=iterator.next()) != null) {// Runs query
+                        Cell kv = tuple.getValue(0);
+                        totalRowCount += 
PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), 
SortOrder.getDefault());
+                    }
+                    // Return total number of rows that have been deleted from 
the table. In the case of auto commit being off
+                    // the mutations will all be in the mutation state of the 
current connection. We need to divide by the
+                    // total number of tables we updated as otherwise the 
client will get an unexpected result
+                    MutationState state = new MutationState(maxSize, 
maxSizeBytes, connection,
+                            totalRowCount /
+                                    
((bestPlan.getTableRef().getTable().getIndexType() == IndexType.LOCAL && 
!otherTableRefs.isEmpty() ? 0 : 1) + otherTableRefs.size()));
+
+                    // set the read metrics accumulated in the parent context 
so that it can be published when the mutations are committed.
+                    state.setReadMetricQueue(context.getReadMetricsQueue());
+
+                    return state;
+                } else {
+                    return deleteRows(context, iterator, bestPlan, 
projectedTableRef, otherTableRefs);
                 }
-            };
+            } finally {
+                iterator.close();
+            }
+        }
+
+        @Override
+        public ExplainPlan getExplainPlan() throws SQLException {
+            List<String> queryPlanSteps =  
bestPlan.getExplainPlan().getPlanSteps();
+            List<String> planSteps = 
Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+            planSteps.add("DELETE ROWS");
+            planSteps.addAll(queryPlanSteps);
+            return new ExplainPlan(planSteps);
+        }
+
+        @Override
+        public Long getEstimatedRowsToScan() throws SQLException {
+            return bestPlan.getEstimatedRowsToScan();
+        }
+
+        @Override
+        public Long getEstimatedBytesToScan() throws SQLException {
+            return bestPlan.getEstimatedBytesToScan();
+        }
+
+        @Override
+        public Long getEstimateInfoTimestamp() throws SQLException {
+            return bestPlan.getEstimateInfoTimestamp();
+        }
+
+        @Override
+        public QueryPlan getQueryPlan() {
+            return bestPlan;
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d8a6bc3/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java
index ddc2004..97f3f3d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java
@@ -24,6 +24,7 @@ import org.apache.phoenix.schema.TableRef;
 
 
 public interface MutationPlan extends StatementPlan {
-    public MutationState execute() throws SQLException;
-    public TableRef getTargetRef();
+    MutationState execute() throws SQLException;
+    TableRef getTargetRef();
+    QueryPlan getQueryPlan();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d8a6bc3/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 6445894..3603ce7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -694,173 +694,13 @@ public class UpsertCompiler {
                     
                     // Ignore order by - it has no impact
                     final QueryPlan aggPlan = new AggregatePlan(context, 
select, statementContext.getCurrentTable(), aggProjector, null,null, 
OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
-                    return new MutationPlan() {
-                        @Override
-                        public ParameterMetaData getParameterMetaData() {
-                            return 
queryPlan.getContext().getBindManager().getParameterMetaData();
-                        }
-    
-                        @Override
-                        public StatementContext getContext() {
-                            return queryPlan.getContext();
-                        }
-
-                        @Override
-                        public TableRef getTargetRef() {
-                            return tableRef;
-                        }
-
-                        @Override
-                        public Set<TableRef> getSourceRefs() {
-                            return originalQueryPlan.getSourceRefs();
-                        }
-
-                               @Override
-                               public Operation getOperation() {
-                                       return operation;
-                               }
-
-                        @Override
-                        public MutationState execute() throws SQLException {
-                            ImmutableBytesWritable ptr = context.getTempPtr();
-                            PTable table = tableRef.getTable();
-                            table.getIndexMaintainers(ptr, 
context.getConnection());
-                            byte[] txState = table.isTransactional() ? 
connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
-
-                            if (ptr.getLength() > 0) {
-                                byte[] uuidValue = 
ServerCacheClient.generateId();
-                                
scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                
scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
-                                
scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
-                            }
-                            ResultIterator iterator = aggPlan.iterator();
-                            try {
-                                Tuple row = iterator.next();
-                                final long mutationCount = 
(Long)aggProjector.getColumnProjector(0).getValue(row,
-                                        PLong.INSTANCE, ptr);
-                                return new MutationState(maxSize, 
maxSizeBytes, connection) {
-                                    @Override
-                                    public long getUpdateCount() {
-                                        return mutationCount;
-                                    }
-                                };
-                            } finally {
-                                iterator.close();
-                            }
-                            
-                        }
-
-                        @Override
-                        public ExplainPlan getExplainPlan() throws 
SQLException {
-                            List<String> queryPlanSteps =  
aggPlan.getExplainPlan().getPlanSteps();
-                            List<String> planSteps = 
Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
-                            planSteps.add("UPSERT ROWS");
-                            planSteps.addAll(queryPlanSteps);
-                            return new ExplainPlan(planSteps);
-                        }
-
-                        @Override
-                        public Long getEstimatedRowsToScan() throws 
SQLException {
-                            return aggPlan.getEstimatedRowsToScan();
-                        }
-
-                        @Override
-                        public Long getEstimatedBytesToScan() throws 
SQLException {
-                            return aggPlan.getEstimatedBytesToScan();
-                        }
-
-                        @Override
-                        public Long getEstimateInfoTimestamp() throws 
SQLException {
-                            return aggPlan.getEstimateInfoTimestamp();
-                        }
-                    };
+                    return new ServerUpsertSelectMutationPlan(queryPlan, 
tableRef, originalQueryPlan, context, connection, scan, aggPlan, aggProjector, 
maxSize, maxSizeBytes);
                 }
             }
             
////////////////////////////////////////////////////////////////////
             // UPSERT SELECT run client-side
             
/////////////////////////////////////////////////////////////////////
-            return new MutationPlan() {
-                @Override
-                public ParameterMetaData getParameterMetaData() {
-                    return 
queryPlan.getContext().getBindManager().getParameterMetaData();
-                }
-
-                @Override
-                public StatementContext getContext() {
-                    return queryPlan.getContext();
-                }
-
-                @Override
-                public TableRef getTargetRef() {
-                    return tableRef;
-                }
-
-                @Override
-                public Set<TableRef> getSourceRefs() {
-                    return originalQueryPlan.getSourceRefs();
-                }
-
-                       @Override
-                       public Operation getOperation() {
-                               return operation;
-                       }
-
-                @Override
-                public MutationState execute() throws SQLException {
-                    ResultIterator iterator = queryPlan.iterator();
-                    if (parallelIteratorFactory == null) {
-                        return upsertSelect(new StatementContext(statement), 
tableRef, projector, iterator, columnIndexes, pkSlotIndexes, 
useServerTimestamp, false);
-                    }
-                    try {
-                        parallelIteratorFactory.setRowProjector(projector);
-                        
parallelIteratorFactory.setColumnIndexes(columnIndexes);
-                        
parallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
-                        Tuple tuple;
-                        long totalRowCount = 0;
-                        StatementContext context = queryPlan.getContext();
-                        while ((tuple=iterator.next()) != null) {// Runs query
-                            Cell kv = tuple.getValue(0);
-                            totalRowCount += 
PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), 
SortOrder.getDefault());
-                        }
-                        // Return total number of rows that have been updated. 
In the case of auto commit being off
-                        // the mutations will all be in the mutation state of 
the current connection.
-                        MutationState mutationState = new 
MutationState(maxSize, maxSizeBytes, statement.getConnection(), totalRowCount);
-                        /*
-                         *  All the metrics collected for measuring the reads 
done by the parallel mutating iterators
-                         *  is included in the ReadMetricHolder of the 
statement context. Include these metrics in the
-                         *  returned mutation state so they can be published 
on commit. 
-                         */
-                        
mutationState.setReadMetricQueue(context.getReadMetricsQueue());
-                        return mutationState; 
-                    } finally {
-                        iterator.close();
-                    }
-                }
-
-                @Override
-                public ExplainPlan getExplainPlan() throws SQLException {
-                    List<String> queryPlanSteps =  
queryPlan.getExplainPlan().getPlanSteps();
-                    List<String> planSteps = 
Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
-                    planSteps.add("UPSERT SELECT");
-                    planSteps.addAll(queryPlanSteps);
-                    return new ExplainPlan(planSteps);
-                }
-
-                @Override
-                public Long getEstimatedRowsToScan() throws SQLException {
-                    return queryPlan.getEstimatedRowsToScan();
-                }
-
-                @Override
-                public Long getEstimatedBytesToScan() throws SQLException {
-                    return queryPlan.getEstimatedBytesToScan();
-                }
-
-                @Override
-                public Long getEstimateInfoTimestamp() throws SQLException {
-                    return queryPlan.getEstimateInfoTimestamp();
-                }
-            };
+            return new ClientUpsertSelectMutationPlan(queryPlan, tableRef, 
originalQueryPlan, parallelIteratorFactory, projector, columnIndexes, 
pkSlotIndexes, useServerTimestamp, maxSize, maxSizeBytes);
         }
 
             
@@ -986,124 +826,9 @@ public class UpsertCompiler {
         }
         final byte[] onDupKeyBytes = onDupKeyBytesToBe;
         
-        return new MutationPlan() {
-            @Override
-            public ParameterMetaData getParameterMetaData() {
-                return context.getBindManager().getParameterMetaData();
-            }
-
-            @Override
-            public StatementContext getContext() {
-                return context;
-            }
-
-            @Override
-            public TableRef getTargetRef() {
-                return tableRef;
-            }
-
-            @Override
-            public Set<TableRef> getSourceRefs() {
-                return Collections.emptySet();
-            }
-
-               @Override
-               public Operation getOperation() {
-                       return operation;
-               }
-
-            @Override
-            public MutationState execute() throws SQLException {
-                ImmutableBytesWritable ptr = context.getTempPtr();
-                final SequenceManager sequenceManager = 
context.getSequenceManager();
-                // Next evaluate all the expressions
-                int nodeIndex = nodeIndexOffset;
-                PTable table = tableRef.getTable();
-                Tuple tuple = sequenceManager.getSequenceCount() == 0 ? null :
-                    sequenceManager.newSequenceTuple(null);
-                for (Expression constantExpression : constantExpressions) {
-                    PColumn column = allColumns.get(columnIndexes[nodeIndex]);
-                    constantExpression.evaluate(tuple, ptr);
-                    Object value = null;
-                    if (constantExpression.getDataType() != null) {
-                        value = constantExpression.getDataType().toObject(ptr, 
constantExpression.getSortOrder(), constantExpression.getMaxLength(), 
constantExpression.getScale());
-                        if 
(!constantExpression.getDataType().isCoercibleTo(column.getDataType(), value)) 
{ 
-                            throw TypeMismatchException.newException(
-                                constantExpression.getDataType(), 
column.getDataType(), "expression: "
-                                        + constantExpression.toString() + " in 
column " + column);
-                        }
-                        if (!column.getDataType().isSizeCompatible(ptr, value, 
constantExpression.getDataType(),
-                                constantExpression.getSortOrder(), 
constantExpression.getMaxLength(), 
-                                constantExpression.getScale(), 
column.getMaxLength(), column.getScale())) { 
-                            throw new SQLExceptionInfo.Builder(
-                                
SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
-                                .setMessage("value=" + 
constantExpression.toString()).build().buildException();
-                        }
-                    }
-                    column.getDataType().coerceBytes(ptr, value, 
constantExpression.getDataType(), 
-                            constantExpression.getMaxLength(), 
constantExpression.getScale(), constantExpression.getSortOrder(),
-                            column.getMaxLength(), 
column.getScale(),column.getSortOrder(),
-                            table.rowKeyOrderOptimizable());
-                    if (overlapViewColumns.contains(column) && 
Bytes.compareTo(ptr.get(), ptr.getOffset(), ptr.getLength(), 
column.getViewConstant(), 0, column.getViewConstant().length-1) != 0) {
-                        throw new SQLExceptionInfo.Builder(
-                                SQLExceptionCode.CANNOT_UPDATE_VIEW_COLUMN)
-                                .setColumnName(column.getName().getString())
-                                .setMessage("value=" + 
constantExpression.toString()).build().buildException();
-                    }
-                    values[nodeIndex] = ByteUtil.copyKeyBytesIfNecessary(ptr);
-                    nodeIndex++;
-                }
-                // Add columns based on view
-                for (PColumn column : addViewColumns) {
-                    if (IndexUtil.getViewConstantValue(column, ptr)) {
-                        values[nodeIndex++] = 
ByteUtil.copyKeyBytesIfNecessary(ptr);
-                    } else {
-                        throw new IllegalStateException();
-                    }
-                }
-                Map<ImmutableBytesPtr, RowMutationState> mutation = 
Maps.newHashMapWithExpectedSize(1);
-                IndexMaintainer indexMaintainer = null;
-                byte[][] viewConstants = null;
-                if (table.getIndexType() == IndexType.LOCAL) {
-                    PTable parentTable =
-                            statement
-                                    .getConnection()
-                                    .getMetaDataCache()
-                                    .getTableRef(
-                                        new 
PTableKey(statement.getConnection().getTenantId(),
-                                                
table.getParentName().getString())).getTable();
-                    indexMaintainer = table.getIndexMaintainer(parentTable, 
connection);
-                    viewConstants = IndexUtil.getViewConstants(parentTable);
-                }
-                setValues(values, pkSlotIndexes, columnIndexes, table, 
mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, 
onDupKeyBytes, 0);
-                return new MutationState(tableRef, mutation, 0, maxSize, 
maxSizeBytes, connection);
-            }
-
-            @Override
-            public ExplainPlan getExplainPlan() throws SQLException {
-                List<String> planSteps = Lists.newArrayListWithExpectedSize(2);
-                if (context.getSequenceManager().getSequenceCount() > 0) {
-                    planSteps.add("CLIENT RESERVE " + 
context.getSequenceManager().getSequenceCount() + " SEQUENCES");
-                }
-                planSteps.add("PUT SINGLE ROW");
-                return new ExplainPlan(planSteps);
-            }
-
-            @Override
-            public Long getEstimatedRowsToScan() throws SQLException {
-                return 0l;
-            }
-
-            @Override
-            public Long getEstimatedBytesToScan() throws SQLException {
-                return 0l;
-            }
-
-            @Override
-            public Long getEstimateInfoTimestamp() throws SQLException {
-                return 0l;
-            }
-        };
+        return new UpsertValuesMutationPlan(context, tableRef, 
nodeIndexOffset, constantExpressions,
+                allColumns, columnIndexes, overlapViewColumns, values, 
addViewColumns,
+                connection, pkSlotIndexes, useServerTimestamp, onDupKeyBytes, 
maxSize, maxSizeBytes);
     }
     
     private static boolean isRowTimestampSet(int[] pkSlotIndexes, PTable 
table) {
@@ -1214,4 +939,394 @@ public class UpsertCompiler {
             }
         }
     }
+
+    private class ServerUpsertSelectMutationPlan implements MutationPlan {
+        private final QueryPlan queryPlan;
+        private final TableRef tableRef;
+        private final QueryPlan originalQueryPlan;
+        private final StatementContext context;
+        private final PhoenixConnection connection;
+        private final Scan scan;
+        private final QueryPlan aggPlan;
+        private final RowProjector aggProjector;
+        private final int maxSize;
+        private final int maxSizeBytes;
+
+        public ServerUpsertSelectMutationPlan(QueryPlan queryPlan, TableRef 
tableRef, QueryPlan originalQueryPlan,
+                                              StatementContext context, 
PhoenixConnection connection,
+                                              Scan scan, QueryPlan aggPlan, 
RowProjector aggProjector,
+                                              int maxSize, int maxSizeBytes) {
+            this.queryPlan = queryPlan;
+            this.tableRef = tableRef;
+            this.originalQueryPlan = originalQueryPlan;
+            this.context = context;
+            this.connection = connection;
+            this.scan = scan;
+            this.aggPlan = aggPlan;
+            this.aggProjector = aggProjector;
+            this.maxSize = maxSize;
+            this.maxSizeBytes = maxSizeBytes;
+        }
+
+        @Override
+        public ParameterMetaData getParameterMetaData() {
+            return 
queryPlan.getContext().getBindManager().getParameterMetaData();
+        }
+
+        @Override
+        public StatementContext getContext() {
+            return queryPlan.getContext();
+        }
+
+        @Override
+        public TableRef getTargetRef() {
+            return tableRef;
+        }
+
+        @Override
+        public QueryPlan getQueryPlan() {
+            return aggPlan;
+        }
+
+        @Override
+        public Set<TableRef> getSourceRefs() {
+            return originalQueryPlan.getSourceRefs();
+        }
+
+        @Override
+        public Operation getOperation() {
+          return operation;
+        }
+
+        @Override
+        public MutationState execute() throws SQLException {
+            ImmutableBytesWritable ptr = context.getTempPtr();
+            PTable table = tableRef.getTable();
+            table.getIndexMaintainers(ptr, context.getConnection());
+            byte[] txState = table.isTransactional() ?
+                    connection.getMutationState().encodeTransaction() : 
ByteUtil.EMPTY_BYTE_ARRAY;
+
+            if (ptr.getLength() > 0) {
+                byte[] uuidValue = ServerCacheClient.generateId();
+                scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+                scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
+                scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+            }
+            ResultIterator iterator = aggPlan.iterator();
+            try {
+                Tuple row = iterator.next();
+                final long mutationCount = (Long) 
aggProjector.getColumnProjector(0).getValue(row,
+                        PLong.INSTANCE, ptr);
+                return new MutationState(maxSize, maxSizeBytes, connection) {
+                    @Override
+                    public long getUpdateCount() {
+                        return mutationCount;
+                    }
+                };
+            } finally {
+                iterator.close();
+            }
+
+        }
+
+        @Override
+        public ExplainPlan getExplainPlan() throws SQLException {
+            List<String> queryPlanSteps =  
aggPlan.getExplainPlan().getPlanSteps();
+            List<String> planSteps = 
Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+            planSteps.add("UPSERT ROWS");
+            planSteps.addAll(queryPlanSteps);
+            return new ExplainPlan(planSteps);
+        }
+
+        @Override
+        public Long getEstimatedRowsToScan() throws SQLException {
+            return aggPlan.getEstimatedRowsToScan();
+        }
+
+        @Override
+        public Long getEstimatedBytesToScan() throws SQLException {
+            return aggPlan.getEstimatedBytesToScan();
+        }
+
+        @Override
+        public Long getEstimateInfoTimestamp() throws SQLException {
+            return aggPlan.getEstimateInfoTimestamp();
+        }
+    }
+
+    private class UpsertValuesMutationPlan implements MutationPlan {
+        private final StatementContext context;
+        private final TableRef tableRef;
+        private final int nodeIndexOffset;
+        private final List<Expression> constantExpressions;
+        private final List<PColumn> allColumns;
+        private final int[] columnIndexes;
+        private final Set<PColumn> overlapViewColumns;
+        private final byte[][] values;
+        private final Set<PColumn> addViewColumns;
+        private final PhoenixConnection connection;
+        private final int[] pkSlotIndexes;
+        private final boolean useServerTimestamp;
+        private final byte[] onDupKeyBytes;
+        private final int maxSize;
+        private final int maxSizeBytes;
+
+        public UpsertValuesMutationPlan(StatementContext context, TableRef 
tableRef, int nodeIndexOffset,
+                                        List<Expression> constantExpressions, 
List<PColumn> allColumns,
+                                        int[] columnIndexes, Set<PColumn> 
overlapViewColumns, byte[][] values,
+                                        Set<PColumn> addViewColumns, 
PhoenixConnection connection,
+                                        int[] pkSlotIndexes, boolean 
useServerTimestamp, byte[] onDupKeyBytes,
+                                        int maxSize, int maxSizeBytes) {
+            this.context = context;
+            this.tableRef = tableRef;
+            this.nodeIndexOffset = nodeIndexOffset;
+            this.constantExpressions = constantExpressions;
+            this.allColumns = allColumns;
+            this.columnIndexes = columnIndexes;
+            this.overlapViewColumns = overlapViewColumns;
+            this.values = values;
+            this.addViewColumns = addViewColumns;
+            this.connection = connection;
+            this.pkSlotIndexes = pkSlotIndexes;
+            this.useServerTimestamp = useServerTimestamp;
+            this.onDupKeyBytes = onDupKeyBytes;
+            this.maxSize = maxSize;
+            this.maxSizeBytes = maxSizeBytes;
+        }
+
+        @Override
+        public ParameterMetaData getParameterMetaData() {
+            return context.getBindManager().getParameterMetaData();
+        }
+
+        @Override
+        public StatementContext getContext() {
+            return context;
+        }
+
+        @Override
+        public TableRef getTargetRef() {
+            return tableRef;
+        }
+
+        @Override
+        public QueryPlan getQueryPlan() {
+            return null;
+        }
+
+        @Override
+        public Set<TableRef> getSourceRefs() {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public Operation getOperation() {
+          return operation;
+        }
+
+        @Override
+        public MutationState execute() throws SQLException {
+            ImmutableBytesWritable ptr = context.getTempPtr();
+            final SequenceManager sequenceManager = 
context.getSequenceManager();
+            // Next evaluate all the expressions
+            int nodeIndex = nodeIndexOffset;
+            PTable table = tableRef.getTable();
+            Tuple tuple = sequenceManager.getSequenceCount() == 0 ? null :
+                sequenceManager.newSequenceTuple(null);
+            for (Expression constantExpression : constantExpressions) {
+                PColumn column = allColumns.get(columnIndexes[nodeIndex]);
+                constantExpression.evaluate(tuple, ptr);
+                Object value = null;
+                if (constantExpression.getDataType() != null) {
+                    value = constantExpression.getDataType().toObject(ptr, 
constantExpression.getSortOrder(),
+                            constantExpression.getMaxLength(), 
constantExpression.getScale());
+                    if 
(!constantExpression.getDataType().isCoercibleTo(column.getDataType(), value)) {
+                        throw TypeMismatchException.newException(
+                            constantExpression.getDataType(), 
column.getDataType(), "expression: "
+                                    + constantExpression.toString() + " in 
column " + column);
+                    }
+                    if (!column.getDataType().isSizeCompatible(ptr, value, 
constantExpression.getDataType(),
+                            constantExpression.getSortOrder(), 
constantExpression.getMaxLength(),
+                            constantExpression.getScale(), 
column.getMaxLength(), column.getScale())) {
+                        throw new SQLExceptionInfo.Builder(
+                            
SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
+                            .setMessage("value=" + 
constantExpression.toString()).build().buildException();
+                    }
+                }
+                column.getDataType().coerceBytes(ptr, value, 
constantExpression.getDataType(),
+                        constantExpression.getMaxLength(), 
constantExpression.getScale(), constantExpression.getSortOrder(),
+                        column.getMaxLength(), 
column.getScale(),column.getSortOrder(),
+                        table.rowKeyOrderOptimizable());
+                if (overlapViewColumns.contains(column) && 
Bytes.compareTo(ptr.get(), ptr.getOffset(), ptr.getLength(), 
column.getViewConstant(), 0, column.getViewConstant().length-1) != 0) {
+                    throw new SQLExceptionInfo.Builder(
+                            SQLExceptionCode.CANNOT_UPDATE_VIEW_COLUMN)
+                            .setColumnName(column.getName().getString())
+                            .setMessage("value=" + 
constantExpression.toString()).build().buildException();
+                }
+                values[nodeIndex] = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                nodeIndex++;
+            }
+            // Add columns based on view
+            for (PColumn column : addViewColumns) {
+                if (IndexUtil.getViewConstantValue(column, ptr)) {
+                    values[nodeIndex++] = 
ByteUtil.copyKeyBytesIfNecessary(ptr);
+                } else {
+                    throw new IllegalStateException();
+                }
+            }
+            Map<ImmutableBytesPtr, RowMutationState> mutation = 
Maps.newHashMapWithExpectedSize(1);
+            IndexMaintainer indexMaintainer = null;
+            byte[][] viewConstants = null;
+            if (table.getIndexType() == IndexType.LOCAL) {
+                PTable parentTable =
+                        statement
+                                .getConnection()
+                                .getMetaDataCache()
+                                .getTableRef(
+                                    new 
PTableKey(statement.getConnection().getTenantId(),
+                                            
table.getParentName().getString())).getTable();
+                indexMaintainer = table.getIndexMaintainer(parentTable, 
connection);
+                viewConstants = IndexUtil.getViewConstants(parentTable);
+            }
+            setValues(values, pkSlotIndexes, columnIndexes, table, mutation, 
statement, useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes, 
0);
+            return new MutationState(tableRef, mutation, 0, maxSize, 
maxSizeBytes, connection);
+        }
+
+        @Override
+        public ExplainPlan getExplainPlan() throws SQLException {
+            List<String> planSteps = Lists.newArrayListWithExpectedSize(2);
+            if (context.getSequenceManager().getSequenceCount() > 0) {
+                planSteps.add("CLIENT RESERVE " + 
context.getSequenceManager().getSequenceCount() + " SEQUENCES");
+            }
+            planSteps.add("PUT SINGLE ROW");
+            return new ExplainPlan(planSteps);
+        }
+
+        @Override
+        public Long getEstimatedRowsToScan() throws SQLException {
+            return 0l;
+        }
+
+        @Override
+        public Long getEstimatedBytesToScan() throws SQLException {
+            return 0l;
+        }
+
+        @Override
+        public Long getEstimateInfoTimestamp() throws SQLException {
+            return 0l;
+        }
+    }
+
+    private class ClientUpsertSelectMutationPlan implements MutationPlan {
+        private final QueryPlan queryPlan;
+        private final TableRef tableRef;
+        private final QueryPlan originalQueryPlan;
+        private final UpsertingParallelIteratorFactory parallelIteratorFactory;
+        private final RowProjector projector;
+        private final int[] columnIndexes;
+        private final int[] pkSlotIndexes;
+        private final boolean useServerTimestamp;
+        private final int maxSize;
+        private final int maxSizeBytes;
+
+        public ClientUpsertSelectMutationPlan(QueryPlan queryPlan, TableRef 
tableRef, QueryPlan originalQueryPlan, UpsertingParallelIteratorFactory 
parallelIteratorFactory, RowProjector projector, int[] columnIndexes, int[] 
pkSlotIndexes, boolean useServerTimestamp, int maxSize, int maxSizeBytes) {
+            this.queryPlan = queryPlan;
+            this.tableRef = tableRef;
+            this.originalQueryPlan = originalQueryPlan;
+            this.parallelIteratorFactory = parallelIteratorFactory;
+            this.projector = projector;
+            this.columnIndexes = columnIndexes;
+            this.pkSlotIndexes = pkSlotIndexes;
+            this.useServerTimestamp = useServerTimestamp;
+            this.maxSize = maxSize;
+            this.maxSizeBytes = maxSizeBytes;
+        }
+
+        @Override
+        public ParameterMetaData getParameterMetaData() {
+            return 
queryPlan.getContext().getBindManager().getParameterMetaData();
+        }
+
+        @Override
+        public StatementContext getContext() {
+            return queryPlan.getContext();
+        }
+
+        @Override
+        public TableRef getTargetRef() {
+            return tableRef;
+        }
+
+        @Override
+        public QueryPlan getQueryPlan() {
+            return queryPlan;
+        }
+
+        @Override
+        public Set<TableRef> getSourceRefs() {
+            return originalQueryPlan.getSourceRefs();
+        }
+
+        @Override
+        public Operation getOperation() {
+          return operation;
+        }
+
+        @Override
+        public MutationState execute() throws SQLException {
+            ResultIterator iterator = queryPlan.iterator();
+            if (parallelIteratorFactory == null) {
+                return upsertSelect(new StatementContext(statement), tableRef, 
projector, iterator, columnIndexes, pkSlotIndexes, useServerTimestamp, false);
+            }
+            try {
+                parallelIteratorFactory.setRowProjector(projector);
+                parallelIteratorFactory.setColumnIndexes(columnIndexes);
+                parallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
+                Tuple tuple;
+                long totalRowCount = 0;
+                StatementContext context = queryPlan.getContext();
+                while ((tuple=iterator.next()) != null) {// Runs query
+                    Cell kv = tuple.getValue(0);
+                    totalRowCount += 
PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), 
SortOrder.getDefault());
+                }
+                // Return total number of rows that have been updated. In the 
case of auto commit being off
+                // the mutations will all be in the mutation state of the 
current connection.
+                MutationState mutationState = new MutationState(maxSize, 
maxSizeBytes, statement.getConnection(), totalRowCount);
+                /*
+                 *  All the metrics collected for measuring the reads done by 
the parallel mutating iterators
+                 *  is included in the ReadMetricHolder of the statement 
context. Include these metrics in the
+                 *  returned mutation state so they can be published on commit.
+                 */
+                
mutationState.setReadMetricQueue(context.getReadMetricsQueue());
+                return mutationState;
+            } finally {
+                iterator.close();
+            }
+        }
+
+        @Override
+        public ExplainPlan getExplainPlan() throws SQLException {
+            List<String> queryPlanSteps =  
queryPlan.getExplainPlan().getPlanSteps();
+            List<String> planSteps = 
Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+            planSteps.add("UPSERT SELECT");
+            planSteps.addAll(queryPlanSteps);
+            return new ExplainPlan(planSteps);
+        }
+
+        @Override
+        public Long getEstimatedRowsToScan() throws SQLException {
+            return queryPlan.getEstimatedRowsToScan();
+        }
+
+        @Override
+        public Long getEstimatedBytesToScan() throws SQLException {
+            return queryPlan.getEstimatedBytesToScan();
+        }
+
+        @Override
+        public Long getEstimateInfoTimestamp() throws SQLException {
+            return queryPlan.getEstimateInfoTimestamp();
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d8a6bc3/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index d35cce1..174e643 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -1313,11 +1313,12 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
                 public ExplainPlan getExplainPlan() throws SQLException {
                     return new ExplainPlan(Collections.singletonList("EXECUTE 
UPGRADE"));
                 }
-                
+
                 @Override
-                public StatementContext getContext() {
-                    return new StatementContext(stmt);
-                }
+                public QueryPlan getQueryPlan() { return null; }
+
+                @Override
+                public StatementContext getContext() { return new 
StatementContext(stmt); }
                 
                 @Override
                 public TableRef getTargetRef() {

Reply via email to