http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java
index ddc2004..97f3f3d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java
@@ -24,6 +24,7 @@ import org.apache.phoenix.schema.TableRef;
 
 
 public interface MutationPlan extends StatementPlan {
-    public MutationState execute() throws SQLException;
-    public TableRef getTargetRef();
+    MutationState execute() throws SQLException;
+    TableRef getTargetRef();
+    QueryPlan getQueryPlan();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index af2254b..287f9e0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -307,7 +307,7 @@ public class QueryCompiler {
                 JoinSpec joinSpec = joinSpecs.get(i);
                 
context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, 
context.getConnection(), query.getUdfParseNodes()));
                 joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // 
place-holder
-                Pair<List<Expression>, List<Expression>> joinConditions = 
joinSpec.compileJoinConditions(context, subContexts[i], true);
+                Pair<List<Expression>, List<Expression>> joinConditions = 
joinSpec.compileJoinConditions(context, subContexts[i], 
JoinCompiler.Strategy.HASH_BUILD_RIGHT);
                 joinExpressions[i] = joinConditions.getFirst();
                 List<Expression> hashExpressions = joinConditions.getSecond();
                 Pair<Expression, Expression> keyRangeExpressions = new 
Pair<Expression, Expression>(null, null);
@@ -369,7 +369,7 @@ public class QueryCompiler {
             context.setCurrentTable(rhsTableRef);
             
context.setResolver(FromCompiler.getResolverForProjectedTable(rhsProjTable, 
context.getConnection(), rhs.getUdfParseNodes()));
             ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[] {new 
ImmutableBytesPtr(emptyByteArray)};
-            Pair<List<Expression>, List<Expression>> joinConditions = 
lastJoinSpec.compileJoinConditions(lhsCtx, context, true);
+            Pair<List<Expression>, List<Expression>> joinConditions = 
lastJoinSpec.compileJoinConditions(lhsCtx, context, 
JoinCompiler.Strategy.HASH_BUILD_LEFT);
             List<Expression> joinExpressions = joinConditions.getSecond();
             List<Expression> hashExpressions = joinConditions.getFirst();
             boolean needsMerge = lhsJoin.hasPostReference();
@@ -422,7 +422,7 @@ public class QueryCompiler {
         QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, 
true, rhsOrderBy);
         PTable rhsProjTable = 
rhsCtx.getResolver().getTables().get(0).getTable();
         
-        Pair<List<Expression>, List<Expression>> joinConditions = 
lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, 
type == JoinType.Right ? lhsCtx : rhsCtx, false);
+        Pair<List<Expression>, List<Expression>> joinConditions = 
lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, 
type == JoinType.Right ? lhsCtx : rhsCtx, JoinCompiler.Strategy.SORT_MERGE);
         List<Expression> lhsKeyExpressions = type == JoinType.Right ? 
joinConditions.getSecond() : joinConditions.getFirst();
         List<Expression> rhsKeyExpressions = type == JoinType.Right ? 
joinConditions.getFirst() : joinConditions.getSecond();
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index f7cdcbf..ca88984 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -26,6 +26,7 @@ import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.TableRef;
@@ -52,7 +53,9 @@ public interface QueryPlan extends StatementPlan {
     public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) 
throws SQLException;
 
     public long getEstimatedSize();
-    
+
+    public Cost getCost();
+
     // TODO: change once joins are supported
     TableRef getTableRef();
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index 62e6991..2714858 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -46,6 +46,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
 import org.apache.phoenix.metrics.MetricInfo;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.LiteralParseNode;
 import org.apache.phoenix.parse.ParseNodeFactory;
@@ -194,6 +195,11 @@ public class TraceQueryPlan implements QueryPlan {
     }
 
     @Override
+    public Cost getCost() {
+        return Cost.ZERO;
+    }
+
+    @Override
     public Set<TableRef> getSourceRefs() {
         return Collections.emptySet();
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 9eaaf62..a81a427 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -47,6 +47,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.AggregatePlan;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
 import org.apache.phoenix.execute.MutationState.RowMutationState;
 import org.apache.phoenix.execute.MutationState.RowTimestampColInfo;
 import org.apache.phoenix.expression.Determinism;
@@ -116,9 +117,10 @@ import com.google.common.collect.Sets;
 public class UpsertCompiler {
 
     private static void setValues(byte[][] values, int[] pkSlotIndex, int[] 
columnIndexes,
-            PTable table, Map<ImmutableBytesPtr, RowMutationState> mutation,
+            PTable table, MultiRowMutationState mutation,
             PhoenixStatement statement, boolean useServerTimestamp, 
IndexMaintainer maintainer,
             byte[][] viewConstants, byte[] onDupKeyBytes, int numSplColumns) 
throws SQLException {
+        long columnValueSize = 0;
         Map<PColumn,byte[]> columnValues = 
Maps.newHashMapWithExpectedSize(columnIndexes.length);
         byte[][] pkValues = new byte[table.getPKColumns().size()][];
         // If the table uses salting, the first byte is the salting byte, set 
to an empty array
@@ -148,6 +150,7 @@ public class UpsertCompiler {
                 }
             } else {
                 columnValues.put(column, value);
+                columnValueSize += (column.getEstimatedSize() + value.length);
             }
         }
         ImmutableBytesPtr ptr = new ImmutableBytesPtr();
@@ -166,7 +169,7 @@ public class UpsertCompiler {
                     regionPrefix.length));
             }
         } 
-        mutation.put(ptr, new RowMutationState(columnValues, 
statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, 
onDupKeyBytes));
+        mutation.put(ptr, new RowMutationState(columnValues, columnValueSize, 
statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, 
onDupKeyBytes));
     }
     
     public static MutationState upsertSelect(StatementContext childContext, 
TableRef tableRef, RowProjector projector,
@@ -195,7 +198,7 @@ public class UpsertCompiler {
             }
         }
         int rowCount = 0;
-        Map<ImmutableBytesPtr, RowMutationState> mutation = 
Maps.newHashMapWithExpectedSize(batchSize);
+        MultiRowMutationState mutation = new MultiRowMutationState(batchSize);
         PTable table = tableRef.getTable();
         IndexMaintainer indexMaintainer = null;
         byte[][] viewConstants = null;
@@ -695,173 +698,13 @@ public class UpsertCompiler {
                     
                     // Ignore order by - it has no impact
                     final QueryPlan aggPlan = new AggregatePlan(context, 
select, statementContext.getCurrentTable(), aggProjector, null,null, 
OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
-                    return new MutationPlan() {
-                        @Override
-                        public ParameterMetaData getParameterMetaData() {
-                            return 
queryPlan.getContext().getBindManager().getParameterMetaData();
-                        }
-    
-                        @Override
-                        public StatementContext getContext() {
-                            return queryPlan.getContext();
-                        }
-
-                        @Override
-                        public TableRef getTargetRef() {
-                            return tableRef;
-                        }
-
-                        @Override
-                        public Set<TableRef> getSourceRefs() {
-                            return originalQueryPlan.getSourceRefs();
-                        }
-
-                               @Override
-                               public Operation getOperation() {
-                                       return operation;
-                               }
-
-                        @Override
-                        public MutationState execute() throws SQLException {
-                            ImmutableBytesWritable ptr = context.getTempPtr();
-                            PTable table = tableRef.getTable();
-                            table.getIndexMaintainers(ptr, 
context.getConnection());
-                            byte[] txState = table.isTransactional() ? 
connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
-
-                            if (ptr.getLength() > 0) {
-                                byte[] uuidValue = 
ServerCacheClient.generateId();
-                                
scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                
scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
-                                
scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
-                            }
-                            ResultIterator iterator = aggPlan.iterator();
-                            try {
-                                Tuple row = iterator.next();
-                                final long mutationCount = 
(Long)aggProjector.getColumnProjector(0).getValue(row,
-                                        PLong.INSTANCE, ptr);
-                                return new MutationState(maxSize, 
maxSizeBytes, connection) {
-                                    @Override
-                                    public long getUpdateCount() {
-                                        return mutationCount;
-                                    }
-                                };
-                            } finally {
-                                iterator.close();
-                            }
-                            
-                        }
-
-                        @Override
-                        public ExplainPlan getExplainPlan() throws 
SQLException {
-                            List<String> queryPlanSteps =  
aggPlan.getExplainPlan().getPlanSteps();
-                            List<String> planSteps = 
Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
-                            planSteps.add("UPSERT ROWS");
-                            planSteps.addAll(queryPlanSteps);
-                            return new ExplainPlan(planSteps);
-                        }
-
-                        @Override
-                        public Long getEstimatedRowsToScan() throws 
SQLException {
-                            return aggPlan.getEstimatedRowsToScan();
-                        }
-
-                        @Override
-                        public Long getEstimatedBytesToScan() throws 
SQLException {
-                            return aggPlan.getEstimatedBytesToScan();
-                        }
-
-                        @Override
-                        public Long getEstimateInfoTimestamp() throws 
SQLException {
-                            return aggPlan.getEstimateInfoTimestamp();
-                        }
-                    };
+                    return new ServerUpsertSelectMutationPlan(queryPlan, 
tableRef, originalQueryPlan, context, connection, scan, aggPlan, aggProjector, 
maxSize, maxSizeBytes);
                 }
             }
             
////////////////////////////////////////////////////////////////////
             // UPSERT SELECT run client-side
             
/////////////////////////////////////////////////////////////////////
-            return new MutationPlan() {
-                @Override
-                public ParameterMetaData getParameterMetaData() {
-                    return 
queryPlan.getContext().getBindManager().getParameterMetaData();
-                }
-
-                @Override
-                public StatementContext getContext() {
-                    return queryPlan.getContext();
-                }
-
-                @Override
-                public TableRef getTargetRef() {
-                    return tableRef;
-                }
-
-                @Override
-                public Set<TableRef> getSourceRefs() {
-                    return originalQueryPlan.getSourceRefs();
-                }
-
-                       @Override
-                       public Operation getOperation() {
-                               return operation;
-                       }
-
-                @Override
-                public MutationState execute() throws SQLException {
-                    ResultIterator iterator = queryPlan.iterator();
-                    if (parallelIteratorFactory == null) {
-                        return upsertSelect(new StatementContext(statement), 
tableRef, projector, iterator, columnIndexes, pkSlotIndexes, 
useServerTimestamp, false);
-                    }
-                    try {
-                        parallelIteratorFactory.setRowProjector(projector);
-                        
parallelIteratorFactory.setColumnIndexes(columnIndexes);
-                        
parallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
-                        Tuple tuple;
-                        long totalRowCount = 0;
-                        StatementContext context = queryPlan.getContext();
-                        while ((tuple=iterator.next()) != null) {// Runs query
-                            Cell kv = tuple.getValue(0);
-                            totalRowCount += 
PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), 
SortOrder.getDefault());
-                        }
-                        // Return total number of rows that have been updated. 
In the case of auto commit being off
-                        // the mutations will all be in the mutation state of 
the current connection.
-                        MutationState mutationState = new 
MutationState(maxSize, maxSizeBytes, statement.getConnection(), totalRowCount);
-                        /*
-                         *  All the metrics collected for measuring the reads 
done by the parallel mutating iterators
-                         *  is included in the ReadMetricHolder of the 
statement context. Include these metrics in the
-                         *  returned mutation state so they can be published 
on commit. 
-                         */
-                        
mutationState.setReadMetricQueue(context.getReadMetricsQueue());
-                        return mutationState; 
-                    } finally {
-                        iterator.close();
-                    }
-                }
-
-                @Override
-                public ExplainPlan getExplainPlan() throws SQLException {
-                    List<String> queryPlanSteps =  
queryPlan.getExplainPlan().getPlanSteps();
-                    List<String> planSteps = 
Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
-                    planSteps.add("UPSERT SELECT");
-                    planSteps.addAll(queryPlanSteps);
-                    return new ExplainPlan(planSteps);
-                }
-
-                @Override
-                public Long getEstimatedRowsToScan() throws SQLException {
-                    return queryPlan.getEstimatedRowsToScan();
-                }
-
-                @Override
-                public Long getEstimatedBytesToScan() throws SQLException {
-                    return queryPlan.getEstimatedBytesToScan();
-                }
-
-                @Override
-                public Long getEstimateInfoTimestamp() throws SQLException {
-                    return queryPlan.getEstimateInfoTimestamp();
-                }
-            };
+            return new ClientUpsertSelectMutationPlan(queryPlan, tableRef, 
originalQueryPlan, parallelIteratorFactory, projector, columnIndexes, 
pkSlotIndexes, useServerTimestamp, maxSize, maxSizeBytes);
         }
 
             
@@ -987,124 +830,9 @@ public class UpsertCompiler {
         }
         final byte[] onDupKeyBytes = onDupKeyBytesToBe;
         
-        return new MutationPlan() {
-            @Override
-            public ParameterMetaData getParameterMetaData() {
-                return context.getBindManager().getParameterMetaData();
-            }
-
-            @Override
-            public StatementContext getContext() {
-                return context;
-            }
-
-            @Override
-            public TableRef getTargetRef() {
-                return tableRef;
-            }
-
-            @Override
-            public Set<TableRef> getSourceRefs() {
-                return Collections.emptySet();
-            }
-
-               @Override
-               public Operation getOperation() {
-                       return operation;
-               }
-
-            @Override
-            public MutationState execute() throws SQLException {
-                ImmutableBytesWritable ptr = context.getTempPtr();
-                final SequenceManager sequenceManager = 
context.getSequenceManager();
-                // Next evaluate all the expressions
-                int nodeIndex = nodeIndexOffset;
-                PTable table = tableRef.getTable();
-                Tuple tuple = sequenceManager.getSequenceCount() == 0 ? null :
-                    sequenceManager.newSequenceTuple(null);
-                for (Expression constantExpression : constantExpressions) {
-                    PColumn column = allColumns.get(columnIndexes[nodeIndex]);
-                    constantExpression.evaluate(tuple, ptr);
-                    Object value = null;
-                    if (constantExpression.getDataType() != null) {
-                        value = constantExpression.getDataType().toObject(ptr, 
constantExpression.getSortOrder(), constantExpression.getMaxLength(), 
constantExpression.getScale());
-                        if 
(!constantExpression.getDataType().isCoercibleTo(column.getDataType(), value)) 
{ 
-                            throw TypeMismatchException.newException(
-                                constantExpression.getDataType(), 
column.getDataType(), "expression: "
-                                        + constantExpression.toString() + " in 
column " + column);
-                        }
-                        if (!column.getDataType().isSizeCompatible(ptr, value, 
constantExpression.getDataType(),
-                                constantExpression.getSortOrder(), 
constantExpression.getMaxLength(), 
-                                constantExpression.getScale(), 
column.getMaxLength(), column.getScale())) { 
-                            throw new SQLExceptionInfo.Builder(
-                                
SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
-                                .setMessage("value=" + 
constantExpression.toString()).build().buildException();
-                        }
-                    }
-                    column.getDataType().coerceBytes(ptr, value, 
constantExpression.getDataType(), 
-                            constantExpression.getMaxLength(), 
constantExpression.getScale(), constantExpression.getSortOrder(),
-                            column.getMaxLength(), 
column.getScale(),column.getSortOrder(),
-                            table.rowKeyOrderOptimizable());
-                    if (overlapViewColumns.contains(column) && 
Bytes.compareTo(ptr.get(), ptr.getOffset(), ptr.getLength(), 
column.getViewConstant(), 0, column.getViewConstant().length-1) != 0) {
-                        throw new SQLExceptionInfo.Builder(
-                                SQLExceptionCode.CANNOT_UPDATE_VIEW_COLUMN)
-                                .setColumnName(column.getName().getString())
-                                .setMessage("value=" + 
constantExpression.toString()).build().buildException();
-                    }
-                    values[nodeIndex] = ByteUtil.copyKeyBytesIfNecessary(ptr);
-                    nodeIndex++;
-                }
-                // Add columns based on view
-                for (PColumn column : addViewColumns) {
-                    if (IndexUtil.getViewConstantValue(column, ptr)) {
-                        values[nodeIndex++] = 
ByteUtil.copyKeyBytesIfNecessary(ptr);
-                    } else {
-                        throw new IllegalStateException();
-                    }
-                }
-                Map<ImmutableBytesPtr, RowMutationState> mutation = 
Maps.newHashMapWithExpectedSize(1);
-                IndexMaintainer indexMaintainer = null;
-                byte[][] viewConstants = null;
-                if (table.getIndexType() == IndexType.LOCAL) {
-                    PTable parentTable =
-                            statement
-                                    .getConnection()
-                                    .getMetaDataCache()
-                                    .getTableRef(
-                                        new 
PTableKey(statement.getConnection().getTenantId(),
-                                                
table.getParentName().getString())).getTable();
-                    indexMaintainer = table.getIndexMaintainer(parentTable, 
connection);
-                    viewConstants = IndexUtil.getViewConstants(parentTable);
-                }
-                setValues(values, pkSlotIndexes, columnIndexes, table, 
mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, 
onDupKeyBytes, 0);
-                return new MutationState(tableRef, mutation, 0, maxSize, 
maxSizeBytes, connection);
-            }
-
-            @Override
-            public ExplainPlan getExplainPlan() throws SQLException {
-                List<String> planSteps = Lists.newArrayListWithExpectedSize(2);
-                if (context.getSequenceManager().getSequenceCount() > 0) {
-                    planSteps.add("CLIENT RESERVE " + 
context.getSequenceManager().getSequenceCount() + " SEQUENCES");
-                }
-                planSteps.add("PUT SINGLE ROW");
-                return new ExplainPlan(planSteps);
-            }
-
-            @Override
-            public Long getEstimatedRowsToScan() throws SQLException {
-                return 0l;
-            }
-
-            @Override
-            public Long getEstimatedBytesToScan() throws SQLException {
-                return 0l;
-            }
-
-            @Override
-            public Long getEstimateInfoTimestamp() throws SQLException {
-                return 0l;
-            }
-        };
+        return new UpsertValuesMutationPlan(context, tableRef, 
nodeIndexOffset, constantExpressions,
+                allColumns, columnIndexes, overlapViewColumns, values, 
addViewColumns,
+                connection, pkSlotIndexes, useServerTimestamp, onDupKeyBytes, 
maxSize, maxSizeBytes);
     }
     
     private static boolean isRowTimestampSet(int[] pkSlotIndexes, PTable 
table) {
@@ -1215,4 +943,394 @@ public class UpsertCompiler {
             }
         }
     }
+
+    private class ServerUpsertSelectMutationPlan implements MutationPlan {
+        private final QueryPlan queryPlan;
+        private final TableRef tableRef;
+        private final QueryPlan originalQueryPlan;
+        private final StatementContext context;
+        private final PhoenixConnection connection;
+        private final Scan scan;
+        private final QueryPlan aggPlan;
+        private final RowProjector aggProjector;
+        private final int maxSize;
+        private final int maxSizeBytes;
+
+        public ServerUpsertSelectMutationPlan(QueryPlan queryPlan, TableRef 
tableRef, QueryPlan originalQueryPlan,
+                                              StatementContext context, 
PhoenixConnection connection,
+                                              Scan scan, QueryPlan aggPlan, 
RowProjector aggProjector,
+                                              int maxSize, int maxSizeBytes) {
+            this.queryPlan = queryPlan;
+            this.tableRef = tableRef;
+            this.originalQueryPlan = originalQueryPlan;
+            this.context = context;
+            this.connection = connection;
+            this.scan = scan;
+            this.aggPlan = aggPlan;
+            this.aggProjector = aggProjector;
+            this.maxSize = maxSize;
+            this.maxSizeBytes = maxSizeBytes;
+        }
+
+        @Override
+        public ParameterMetaData getParameterMetaData() {
+            return 
queryPlan.getContext().getBindManager().getParameterMetaData();
+        }
+
+        @Override
+        public StatementContext getContext() {
+            return queryPlan.getContext();
+        }
+
+        @Override
+        public TableRef getTargetRef() {
+            return tableRef;
+        }
+
+        @Override
+        public QueryPlan getQueryPlan() {
+            return aggPlan;
+        }
+
+        @Override
+        public Set<TableRef> getSourceRefs() {
+            return originalQueryPlan.getSourceRefs();
+        }
+
+        @Override
+        public Operation getOperation() {
+          return operation;
+        }
+
+        @Override
+        public MutationState execute() throws SQLException {
+            ImmutableBytesWritable ptr = context.getTempPtr();
+            PTable table = tableRef.getTable();
+            table.getIndexMaintainers(ptr, context.getConnection());
+            byte[] txState = table.isTransactional() ?
+                    connection.getMutationState().encodeTransaction() : 
ByteUtil.EMPTY_BYTE_ARRAY;
+
+            if (ptr.getLength() > 0) {
+                byte[] uuidValue = ServerCacheClient.generateId();
+                scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+                scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
+                scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+            }
+            ResultIterator iterator = aggPlan.iterator();
+            try {
+                Tuple row = iterator.next();
+                final long mutationCount = (Long) 
aggProjector.getColumnProjector(0).getValue(row,
+                        PLong.INSTANCE, ptr);
+                return new MutationState(maxSize, maxSizeBytes, connection) {
+                    @Override
+                    public long getUpdateCount() {
+                        return mutationCount;
+                    }
+                };
+            } finally {
+                iterator.close();
+            }
+
+        }
+
+        @Override
+        public ExplainPlan getExplainPlan() throws SQLException {
+            List<String> queryPlanSteps =  
aggPlan.getExplainPlan().getPlanSteps();
+            List<String> planSteps = 
Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+            planSteps.add("UPSERT ROWS");
+            planSteps.addAll(queryPlanSteps);
+            return new ExplainPlan(planSteps);
+        }
+
+        @Override
+        public Long getEstimatedRowsToScan() throws SQLException {
+            return aggPlan.getEstimatedRowsToScan();
+        }
+
+        @Override
+        public Long getEstimatedBytesToScan() throws SQLException {
+            return aggPlan.getEstimatedBytesToScan();
+        }
+
+        @Override
+        public Long getEstimateInfoTimestamp() throws SQLException {
+            return aggPlan.getEstimateInfoTimestamp();
+        }
+    }
+
+    private class UpsertValuesMutationPlan implements MutationPlan {
+        private final StatementContext context;
+        private final TableRef tableRef;
+        private final int nodeIndexOffset;
+        private final List<Expression> constantExpressions;
+        private final List<PColumn> allColumns;
+        private final int[] columnIndexes;
+        private final Set<PColumn> overlapViewColumns;
+        private final byte[][] values;
+        private final Set<PColumn> addViewColumns;
+        private final PhoenixConnection connection;
+        private final int[] pkSlotIndexes;
+        private final boolean useServerTimestamp;
+        private final byte[] onDupKeyBytes;
+        private final int maxSize;
+        private final int maxSizeBytes;
+
+        public UpsertValuesMutationPlan(StatementContext context, TableRef 
tableRef, int nodeIndexOffset,
+                                        List<Expression> constantExpressions, 
List<PColumn> allColumns,
+                                        int[] columnIndexes, Set<PColumn> 
overlapViewColumns, byte[][] values,
+                                        Set<PColumn> addViewColumns, 
PhoenixConnection connection,
+                                        int[] pkSlotIndexes, boolean 
useServerTimestamp, byte[] onDupKeyBytes,
+                                        int maxSize, int maxSizeBytes) {
+            this.context = context;
+            this.tableRef = tableRef;
+            this.nodeIndexOffset = nodeIndexOffset;
+            this.constantExpressions = constantExpressions;
+            this.allColumns = allColumns;
+            this.columnIndexes = columnIndexes;
+            this.overlapViewColumns = overlapViewColumns;
+            this.values = values;
+            this.addViewColumns = addViewColumns;
+            this.connection = connection;
+            this.pkSlotIndexes = pkSlotIndexes;
+            this.useServerTimestamp = useServerTimestamp;
+            this.onDupKeyBytes = onDupKeyBytes;
+            this.maxSize = maxSize;
+            this.maxSizeBytes = maxSizeBytes;
+        }
+
+        @Override
+        public ParameterMetaData getParameterMetaData() {
+            return context.getBindManager().getParameterMetaData();
+        }
+
+        @Override
+        public StatementContext getContext() {
+            return context;
+        }
+
+        @Override
+        public TableRef getTargetRef() {
+            return tableRef;
+        }
+
+        @Override
+        public QueryPlan getQueryPlan() {
+            return null;
+        }
+
+        @Override
+        public Set<TableRef> getSourceRefs() {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public Operation getOperation() {
+          return operation;
+        }
+
+        @Override
+        public MutationState execute() throws SQLException {
+            ImmutableBytesWritable ptr = context.getTempPtr();
+            final SequenceManager sequenceManager = 
context.getSequenceManager();
+            // Next evaluate all the expressions
+            int nodeIndex = nodeIndexOffset;
+            PTable table = tableRef.getTable();
+            Tuple tuple = sequenceManager.getSequenceCount() == 0 ? null :
+                sequenceManager.newSequenceTuple(null);
+            for (Expression constantExpression : constantExpressions) {
+                PColumn column = allColumns.get(columnIndexes[nodeIndex]);
+                constantExpression.evaluate(tuple, ptr);
+                Object value = null;
+                if (constantExpression.getDataType() != null) {
+                    value = constantExpression.getDataType().toObject(ptr, 
constantExpression.getSortOrder(),
+                            constantExpression.getMaxLength(), 
constantExpression.getScale());
+                    if 
(!constantExpression.getDataType().isCoercibleTo(column.getDataType(), value)) {
+                        throw TypeMismatchException.newException(
+                            constantExpression.getDataType(), 
column.getDataType(), "expression: "
+                                    + constantExpression.toString() + " in 
column " + column);
+                    }
+                    if (!column.getDataType().isSizeCompatible(ptr, value, 
constantExpression.getDataType(),
+                            constantExpression.getSortOrder(), 
constantExpression.getMaxLength(),
+                            constantExpression.getScale(), 
column.getMaxLength(), column.getScale())) {
+                        throw new SQLExceptionInfo.Builder(
+                            
SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
+                            .setMessage("value=" + 
constantExpression.toString()).build().buildException();
+                    }
+                }
+                column.getDataType().coerceBytes(ptr, value, 
constantExpression.getDataType(),
+                        constantExpression.getMaxLength(), 
constantExpression.getScale(), constantExpression.getSortOrder(),
+                        column.getMaxLength(), 
column.getScale(),column.getSortOrder(),
+                        table.rowKeyOrderOptimizable());
+                if (overlapViewColumns.contains(column) && 
Bytes.compareTo(ptr.get(), ptr.getOffset(), ptr.getLength(), 
column.getViewConstant(), 0, column.getViewConstant().length-1) != 0) {
+                    throw new SQLExceptionInfo.Builder(
+                            SQLExceptionCode.CANNOT_UPDATE_VIEW_COLUMN)
+                            .setColumnName(column.getName().getString())
+                            .setMessage("value=" + 
constantExpression.toString()).build().buildException();
+                }
+                values[nodeIndex] = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                nodeIndex++;
+            }
+            // Add columns based on view
+            for (PColumn column : addViewColumns) {
+                if (IndexUtil.getViewConstantValue(column, ptr)) {
+                    values[nodeIndex++] = 
ByteUtil.copyKeyBytesIfNecessary(ptr);
+                } else {
+                    throw new IllegalStateException();
+                }
+            }
+            MultiRowMutationState mutation = new MultiRowMutationState(1);
+            IndexMaintainer indexMaintainer = null;
+            byte[][] viewConstants = null;
+            if (table.getIndexType() == IndexType.LOCAL) {
+                PTable parentTable =
+                        statement
+                                .getConnection()
+                                .getMetaDataCache()
+                                .getTableRef(
+                                    new 
PTableKey(statement.getConnection().getTenantId(),
+                                            
table.getParentName().getString())).getTable();
+                indexMaintainer = table.getIndexMaintainer(parentTable, 
connection);
+                viewConstants = IndexUtil.getViewConstants(parentTable);
+            }
+            setValues(values, pkSlotIndexes, columnIndexes, table, mutation, 
statement, useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes, 
0);
+            return new MutationState(tableRef, mutation, 0, maxSize, 
maxSizeBytes, connection);
+        }
+
+        @Override
+        public ExplainPlan getExplainPlan() throws SQLException {
+            List<String> planSteps = Lists.newArrayListWithExpectedSize(2);
+            if (context.getSequenceManager().getSequenceCount() > 0) {
+                planSteps.add("CLIENT RESERVE " + 
context.getSequenceManager().getSequenceCount() + " SEQUENCES");
+            }
+            planSteps.add("PUT SINGLE ROW");
+            return new ExplainPlan(planSteps);
+        }
+
+        @Override
+        public Long getEstimatedRowsToScan() throws SQLException {
+            return 0l;
+        }
+
+        @Override
+        public Long getEstimatedBytesToScan() throws SQLException {
+            return 0l;
+        }
+
+        @Override
+        public Long getEstimateInfoTimestamp() throws SQLException {
+            return 0l;
+        }
+    }
+
+    private class ClientUpsertSelectMutationPlan implements MutationPlan {
+        private final QueryPlan queryPlan;
+        private final TableRef tableRef;
+        private final QueryPlan originalQueryPlan;
+        private final UpsertingParallelIteratorFactory parallelIteratorFactory;
+        private final RowProjector projector;
+        private final int[] columnIndexes;
+        private final int[] pkSlotIndexes;
+        private final boolean useServerTimestamp;
+        private final int maxSize;
+        private final int maxSizeBytes;
+
+        public ClientUpsertSelectMutationPlan(QueryPlan queryPlan, TableRef 
tableRef, QueryPlan originalQueryPlan, UpsertingParallelIteratorFactory 
parallelIteratorFactory, RowProjector projector, int[] columnIndexes, int[] 
pkSlotIndexes, boolean useServerTimestamp, int maxSize, int maxSizeBytes) {
+            this.queryPlan = queryPlan;
+            this.tableRef = tableRef;
+            this.originalQueryPlan = originalQueryPlan;
+            this.parallelIteratorFactory = parallelIteratorFactory;
+            this.projector = projector;
+            this.columnIndexes = columnIndexes;
+            this.pkSlotIndexes = pkSlotIndexes;
+            this.useServerTimestamp = useServerTimestamp;
+            this.maxSize = maxSize;
+            this.maxSizeBytes = maxSizeBytes;
+        }
+
+        @Override
+        public ParameterMetaData getParameterMetaData() {
+            return 
queryPlan.getContext().getBindManager().getParameterMetaData();
+        }
+
+        @Override
+        public StatementContext getContext() {
+            return queryPlan.getContext();
+        }
+
+        @Override
+        public TableRef getTargetRef() {
+            return tableRef;
+        }
+
+        @Override
+        public QueryPlan getQueryPlan() {
+            return queryPlan;
+        }
+
+        @Override
+        public Set<TableRef> getSourceRefs() {
+            return originalQueryPlan.getSourceRefs();
+        }
+
+        @Override
+        public Operation getOperation() {
+          return operation;
+        }
+
+        @Override
+        public MutationState execute() throws SQLException {
+            ResultIterator iterator = queryPlan.iterator();
+            if (parallelIteratorFactory == null) {
+                return upsertSelect(new StatementContext(statement), tableRef, 
projector, iterator, columnIndexes, pkSlotIndexes, useServerTimestamp, false);
+            }
+            try {
+                parallelIteratorFactory.setRowProjector(projector);
+                parallelIteratorFactory.setColumnIndexes(columnIndexes);
+                parallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
+                Tuple tuple;
+                long totalRowCount = 0;
+                StatementContext context = queryPlan.getContext();
+                while ((tuple=iterator.next()) != null) {// Runs query
+                    Cell kv = tuple.getValue(0);
+                    totalRowCount += 
PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), 
SortOrder.getDefault());
+                }
+                // Return total number of rows that have been updated. In the 
case of auto commit being off
+                // the mutations will all be in the mutation state of the 
current connection.
+                MutationState mutationState = new MutationState(maxSize, 
maxSizeBytes, statement.getConnection(), totalRowCount);
+                /*
+                 *  All the metrics collected for measuring the reads done by 
the parallel mutating iterators
+                 *  is included in the ReadMetricHolder of the statement 
context. Include these metrics in the
+                 *  returned mutation state so they can be published on commit.
+                 */
+                
mutationState.setReadMetricQueue(context.getReadMetricsQueue());
+                return mutationState;
+            } finally {
+                iterator.close();
+            }
+        }
+
+        @Override
+        public ExplainPlan getExplainPlan() throws SQLException {
+            List<String> queryPlanSteps =  
queryPlan.getExplainPlan().getPlanSteps();
+            List<String> planSteps = 
Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+            planSteps.add("UPSERT SELECT");
+            planSteps.addAll(queryPlanSteps);
+            return new ExplainPlan(planSteps);
+        }
+
+        @Override
+        public Long getEstimatedRowsToScan() throws SQLException {
+            return queryPlan.getEstimatedRowsToScan();
+        }
+
+        @Override
+        public Long getEstimatedBytesToScan() throws SQLException {
+            return queryPlan.getEstimatedBytesToScan();
+        }
+
+        @Override
+        public Long getEstimateInfoTimestamp() throws SQLException {
+            return queryPlan.getEstimateInfoTimestamp();
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
index 5e7b996..7bf8259 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
@@ -326,11 +326,6 @@ public class WhereOptimizer {
         PTable table = context.getCurrentTable().getTable();
         for (int i = 0; i < expressions.size(); i++) {
             Expression expression = expressions.get(i);
-            // TODO this is a temporary fix for PHOENIX-3029.
-            if (expression instanceof CoerceExpression
-                    && expression.getSortOrder() != 
expression.getChildren().get(0).getSortOrder()) {
-                continue;
-            }
             KeyExpressionVisitor visitor = new KeyExpressionVisitor(context, 
table);
             KeyExpressionVisitor.KeySlots keySlots = 
expression.accept(visitor);
             int minPkPos = Integer.MAX_VALUE; 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseMetaDataEndpointObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseMetaDataEndpointObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseMetaDataEndpointObserver.java
new file mode 100644
index 0000000..8decc8c
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseMetaDataEndpointObserver.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import 
org.apache.phoenix.coprocessor.PhoenixMetaDataCoprocessorHost.PhoenixMetaDataControllerEnvironment;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+
+public class BaseMetaDataEndpointObserver implements MetaDataEndpointObserver{
+
+    @Override
+    public void start(CoprocessorEnvironment env) throws IOException {
+
+    }
+
+    @Override
+    public void stop(CoprocessorEnvironment env) throws IOException {
+
+    }
+
+    @Override
+    public void preGetTable(
+            
org.apache.hadoop.hbase.coprocessor.ObserverContext<PhoenixMetaDataControllerEnvironment>
 ctx,
+            String tenantId, String tableName, TableName physicalTableName) 
throws IOException {
+
+    }
+
+    
+    @Override
+    public void 
preCreateTable(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, 
String tenantId,
+            String tableName, TableName physicalTableName, TableName 
parentPhysicalTableName, PTableType tableType,
+            Set<byte[]> familySet, Set<TableName> indexes) throws IOException {
+
+    }
+
+    @Override
+    public void 
preDropTable(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String 
tenantId,
+            String tableName, TableName physicalTableName, TableName 
parentPhysicalTableName, PTableType tableType,
+            List<PTable> indexes) throws IOException {
+
+    }
+
+    @Override
+    public void 
preAlterTable(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String 
tenantId,
+            String tableName, TableName physicalTableName, TableName 
parentPhysicalTableName, PTableType type) throws IOException {
+
+    }
+
+    @Override
+    public void 
preGetSchema(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String 
schemaName)
+            throws IOException {
+
+    }
+
+    @Override
+    public void 
preCreateSchema(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, 
String schemaName)
+            throws IOException {
+
+    }
+
+    @Override
+    public void 
preDropSchema(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String 
schemaName) throws IOException {
+
+    }
+
+    @Override
+    public void 
preCreateFunction(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, 
String tenantId,
+            String functionName) throws IOException {
+
+    }
+
+    @Override
+    public void 
preDropFunction(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, 
String tenantId, String functionName)
+            throws IOException {}
+
+    @Override
+    public void 
preGetFunctions(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, 
String tenantId, String functionName)
+            throws IOException {
+
+    }
+
+    @Override
+    public void 
preIndexUpdate(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, 
String tenantId,
+            String indexName, TableName physicalTableName, TableName 
parentPhysicalTableName, PIndexState newState)
+            throws IOException {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index d05ab79..afbd63f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -84,6 +84,7 @@ import static 
org.apache.phoenix.util.SchemaUtil.getVarCharLength;
 import static org.apache.phoenix.util.SchemaUtil.getVarChars;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -91,10 +92,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -105,6 +108,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -121,9 +125,12 @@ import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.ipc.RpcServer.Call;
+import org.apache.hadoop.hbase.ipc.RpcUtil;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.Region.RowLock;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.VersionInfo;
@@ -452,7 +459,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
     private static final int DEFAULT_VALUE_INDEX = 
FUNCTION_ARG_KV_COLUMNS.indexOf(DEFAULT_VALUE_KV);
     private static final int MIN_VALUE_INDEX = 
FUNCTION_ARG_KV_COLUMNS.indexOf(MIN_VALUE_KV);
     private static final int MAX_VALUE_INDEX = 
FUNCTION_ARG_KV_COLUMNS.indexOf(MAX_VALUE_KV);
-    
+
     private static PName newPName(byte[] keyBuffer, int keyOffset, int 
keyLength) {
         if (keyLength <= 0) {
             return null;
@@ -463,6 +470,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
 
     private RegionCoprocessorEnvironment env;
 
+    private PhoenixMetaDataCoprocessorHost phoenixAccessCoprocessorHost;
+    private boolean accessCheckEnabled;
+
     /**
      * Stores a reference to the coprocessor environment provided by the
      * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from 
the region where this
@@ -480,6 +490,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
         } else {
             throw new CoprocessorException("Must be loaded on a table 
region!");
         }
+        
+        phoenixAccessCoprocessorHost = new 
PhoenixMetaDataCoprocessorHost(this.env);
+        this.accessCheckEnabled = 
env.getConfiguration().getBoolean(QueryServices.PHOENIX_ACLS_ENABLED,
+                QueryServicesOptions.DEFAULT_PHOENIX_ACLS_ENABLED);
         logger.info("Starting Tracing-Metrics Systems");
         // Start the phoenix trace collection
         Tracing.addTraceMetricsSource();
@@ -523,6 +537,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
                 done.run(builder.build());
                 return;
             }
+            getCoprocessorHost().preGetTable(Bytes.toString(tenantId), 
SchemaUtil.getTableName(schemaName, tableName),
+                    TableName.valueOf(table.getPhysicalName().getBytes()));
+
             
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
             long disableIndexTimestamp = table.getIndexDisableTimestamp();
             long minNonZerodisableIndexTimestamp = disableIndexTimestamp > 0 ? 
disableIndexTimestamp : Long.MAX_VALUE;
@@ -554,6 +571,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
         }
     }
 
+    private PhoenixMetaDataCoprocessorHost getCoprocessorHost() {
+        return phoenixAccessCoprocessorHost;
+    }
+
     private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, Region 
region,
             long clientTimeStamp, int clientVersion) throws IOException, 
SQLException {
         Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, 
clientTimeStamp);
@@ -1317,12 +1338,14 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
      * @return null if the physical table row information is not present.
      * 
      */
-    private static Mutation getPhysicalTableForView(List<Mutation> 
tableMetadata, byte[][] parentSchemaTableNames) {
+    private static Mutation getPhysicalTableRowForView(List<Mutation> 
tableMetadata, byte[][] parentTenantSchemaTableNames, byte[][] 
physicalSchemaTableNames) {
         int size = tableMetadata.size();
         byte[][] rowKeyMetaData = new byte[3][];
         MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, 
rowKeyMetaData);
         Mutation physicalTableRow = null;
+        Mutation parentTableRow = null;
         boolean physicalTableLinkFound = false;
+        boolean parentTableLinkFound = false;
         if (size >= 2) {
             int i = size - 1;
             while (i >= 1) {
@@ -1332,28 +1355,51 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                     if (linkType == LinkType.PHYSICAL_TABLE) {
                         physicalTableRow = m;
                         physicalTableLinkFound = true;
-                        break;
                     }
+                    if (linkType == LinkType.PARENT_TABLE) {
+                        parentTableRow=m;
+                        parentTableLinkFound = true;
+                    }
+                }
+                if(physicalTableLinkFound && parentTableLinkFound){
+                    break;
                 }
                 i--;
             }
         }
+        if (!parentTableLinkFound) {
+            parentTenantSchemaTableNames[0] = null;
+            parentTenantSchemaTableNames[1] = null;
+            parentTenantSchemaTableNames[2] = null;
+            
+        }
         if (!physicalTableLinkFound) {
-            parentSchemaTableNames[0] = null;
-            parentSchemaTableNames[1] = null;
-            return null;
+            physicalSchemaTableNames[0] = null;
+            physicalSchemaTableNames[1] = null;
+            physicalSchemaTableNames[2] = null;
+        }
+        if (physicalTableLinkFound) {
+            getSchemaTableNames(physicalTableRow,physicalSchemaTableNames);
+        }
+        if (parentTableLinkFound) {
+            getSchemaTableNames(parentTableRow,parentTenantSchemaTableNames);  
 
         }
-        rowKeyMetaData = new byte[5][];
-        getVarChars(physicalTableRow.getRow(), 5, rowKeyMetaData);
+        return physicalTableRow;
+    }
+    
+    private static void getSchemaTableNames(Mutation row, byte[][] 
schemaTableNames) {
+        byte[][] rowKeyMetaData = new byte[5][];
+        getVarChars(row.getRow(), 5, rowKeyMetaData);
+        byte[] tenantId = 
rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
         byte[] colBytes = 
rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX];
         byte[] famBytes = 
rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX];
         if ((colBytes == null || colBytes.length == 0) && (famBytes != null && 
famBytes.length > 0)) {
             byte[] sName = 
SchemaUtil.getSchemaNameFromFullName(famBytes).getBytes();
             byte[] tName = 
SchemaUtil.getTableNameFromFullName(famBytes).getBytes();
-            parentSchemaTableNames[0] = sName;
-            parentSchemaTableNames[1] = tName;
+            schemaTableNames[0]= tenantId;
+            schemaTableNames[1] = sName;
+            schemaTableNames[2] = tName;
         }
-        return physicalTableRow;
     }
     
     @Override
@@ -1370,25 +1416,76 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
             byte[] tenantIdBytes = 
rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
             schemaName = 
rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
             tableName = 
rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
-
+            boolean isNamespaceMapped = 
MetaDataUtil.isNameSpaceMapped(tableMetadata, GenericKeyValueBuilder.INSTANCE,
+                    new ImmutableBytesWritable());
+            final IndexType indexType = 
MetaDataUtil.getIndexType(tableMetadata, GenericKeyValueBuilder.INSTANCE,
+                    new ImmutableBytesWritable());
             byte[] parentSchemaName = null;
             byte[] parentTableName = null;
             PTableType tableType = MetaDataUtil.getTableType(tableMetadata, 
GenericKeyValueBuilder.INSTANCE, new ImmutableBytesWritable());
             byte[] parentTableKey = null;
             Mutation viewPhysicalTableRow = null;
+            Set<TableName> indexes = new HashSet<TableName>();;
+            byte[] cPhysicalName = 
SchemaUtil.getPhysicalHBaseTableName(schemaName, tableName, isNamespaceMapped)
+                    .getBytes();
+            byte[] cParentPhysicalName=null;
             if (tableType == PTableType.VIEW) {
-                byte[][] parentSchemaTableNames = new byte[2][];
+                byte[][] parentSchemaTableNames = new byte[3][];
+                byte[][] parentPhysicalSchemaTableNames = new byte[3][];
                 /*
                  * For a view, we lock the base physical table row. For a 
mapped view, there is 
                  * no link present to the physical table. So the 
viewPhysicalTableRow is null
                  * in that case.
                  */
-                viewPhysicalTableRow = getPhysicalTableForView(tableMetadata, 
parentSchemaTableNames);
-                parentSchemaName = parentSchemaTableNames[0];
-                parentTableName = parentSchemaTableNames[1];
-                if (parentTableName != null) {
-                    parentTableKey = 
SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY, parentSchemaName, 
parentTableName);
+                
+                viewPhysicalTableRow = 
getPhysicalTableRowForView(tableMetadata, 
parentSchemaTableNames,parentPhysicalSchemaTableNames);
+                long clientTimeStamp = 
MetaDataUtil.getClientTimeStamp(tableMetadata);
+                if (parentPhysicalSchemaTableNames[2] != null) {
+                    
+                    parentTableKey = 
SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY,
+                            parentPhysicalSchemaTableNames[1], 
parentPhysicalSchemaTableNames[2]);
+                    PTable parentTable = loadTable(env, parentTableKey, new 
ImmutableBytesPtr(parentTableKey),
+                            clientTimeStamp, clientTimeStamp, clientVersion);
+                    if (parentTable == null) {
+                        
builder.setReturnCode(MetaDataProtos.MutationCode.PARENT_TABLE_NOT_FOUND);
+                        
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                        done.run(builder.build());
+                        return;
+                    }
+                    cParentPhysicalName = 
parentTable.getPhysicalName().getBytes();
+                    if (parentSchemaTableNames[2] != null
+                            && Bytes.compareTo(parentSchemaTableNames[2], 
parentPhysicalSchemaTableNames[2]) != 0) {
+                        // if view is created on view
+                        byte[] parentKey = SchemaUtil.getTableKey(
+                                parentSchemaTableNames[0] == null ? 
ByteUtil.EMPTY_BYTE_ARRAY : parentSchemaTableNames[0],
+                                parentSchemaTableNames[1], 
parentSchemaTableNames[2]);
+                        parentTable = loadTable(env, parentKey, new 
ImmutableBytesPtr(parentKey),
+                                clientTimeStamp, clientTimeStamp, 
clientVersion);
+                        if (parentTable == null) {
+                            // it could be a global view
+                            parentKey = 
SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY,
+                                    parentSchemaTableNames[1], 
parentSchemaTableNames[2]);
+                            parentTable = loadTable(env, parentKey, new 
ImmutableBytesPtr(parentKey),
+                                    clientTimeStamp, clientTimeStamp, 
clientVersion);
+                        }
+                    }
+                    if (parentTable == null) {
+                        
builder.setReturnCode(MetaDataProtos.MutationCode.PARENT_TABLE_NOT_FOUND);
+                        
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                        done.run(builder.build());
+                        return;
+                    }
+                    for (PTable index : parentTable.getIndexes()) {
+                        
indexes.add(TableName.valueOf(index.getPhysicalName().getBytes()));
+                    }
+
+                } else {
+                    // Mapped View
+                    cParentPhysicalName = 
SchemaUtil.getTableNameAsBytes(schemaName, tableName);
                 }
+                parentSchemaName = parentPhysicalSchemaTableNames[1];
+                parentTableName = parentPhysicalSchemaTableNames[2];
+                    
             } else if (tableType == PTableType.INDEX) {
                 parentSchemaName = schemaName;
                 /* 
@@ -1398,7 +1495,27 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                  */ 
                 parentTableName = 
MetaDataUtil.getParentTableName(tableMetadata);
                 parentTableKey = SchemaUtil.getTableKey(tenantIdBytes, 
parentSchemaName, parentTableName);
+                long clientTimeStamp = 
MetaDataUtil.getClientTimeStamp(tableMetadata);
+                PTable parentTable = loadTable(env, parentTableKey, new 
ImmutableBytesPtr(parentTableKey),
+                        clientTimeStamp, clientTimeStamp, clientVersion);
+                if (IndexType.LOCAL == indexType) {
+                    cPhysicalName = parentTable.getPhysicalName().getBytes();
+                    
cParentPhysicalName=parentTable.getPhysicalName().getBytes();
+                } else if (parentTable.getType() == PTableType.VIEW) {
+                    cPhysicalName = 
MetaDataUtil.getViewIndexPhysicalName(parentTable.getPhysicalName().getBytes());
+                    cParentPhysicalName = 
parentTable.getPhysicalName().getBytes();
+                }else{
+                    cParentPhysicalName = SchemaUtil
+                            .getPhysicalHBaseTableName(parentSchemaName, 
parentTableName, isNamespaceMapped).getBytes();
+                }
             }
+            
+            getCoprocessorHost().preCreateTable(Bytes.toString(tenantIdBytes),
+                    SchemaUtil.getTableName(schemaName, tableName),
+                    (tableType == PTableType.VIEW) ? null : 
TableName.valueOf(cPhysicalName),
+                    cParentPhysicalName == null ? null : 
TableName.valueOf(cParentPhysicalName), tableType,
+                    /* TODO: During inital create we may not need the family 
map */
+                    Collections.<byte[]> emptySet(), indexes);
 
             Region region = env.getRegion();
             List<RowLock> locks = Lists.newArrayList();
@@ -1613,7 +1730,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 // primary and then index table locks are held, in that 
order). For now, we just don't support
                 // indexing on the system table. This is an issue because of 
the way we manage batch mutation
                 // in the Indexer.
-                region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> 
emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
+                mutateRowsWithLocks(region, tableMetadata, 
Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
 
                 // Invalidate the cache - the next getTable call will add it
                 // TODO: consider loading the table that was just created 
here, patching up the parent table, and updating the cache
@@ -1632,7 +1749,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 done.run(builder.build());
                 return;
             } finally {
-                region.releaseRowLocks(locks);
+                releaseRowLocks(region,locks);
             }
         } catch (Throwable t) {
             logger.error("createTable failed", t);
@@ -1648,16 +1765,6 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 QueryServicesOptions.DEFAULT_MAX_INDEXES_PER_TABLE);
     }
 
-    private static RowLock acquireLock(Region region, byte[] key, 
List<RowLock> locks)
-        throws IOException {
-        RowLock rowLock = region.getRowLock(key, false);
-        if (rowLock == null) {
-            throw new IOException("Failed to acquire lock on " + 
Bytes.toStringBinary(key));
-        }
-        locks.add(rowLock);
-        return rowLock;
-    }
-
     private static final byte[] CHILD_TABLE_BYTES = new byte[] 
{PTable.LinkType.CHILD_TABLE.getSerializedValue()};
 
     
@@ -1846,6 +1953,23 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                     parentTableName == null ? lockKey : 
SchemaUtil.getTableKey(tenantIdBytes,
                         schemaName, tableName);
 
+            
+            PTableType ptableType=PTableType.fromSerializedValue(tableType);
+            long clientTimeStamp = 
MetaDataUtil.getClientTimeStamp(tableMetadata);
+            byte[] cKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, 
tableName);
+            PTable loadedTable = loadTable(env, cKey, new 
ImmutableBytesPtr(cKey), clientTimeStamp, clientTimeStamp,
+                    request.getClientVersion());
+            if (loadedTable == null) {
+                
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
+                
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                done.run(builder.build());
+                return;
+            }
+            getCoprocessorHost().preDropTable(Bytes.toString(tenantIdBytes),
+                    SchemaUtil.getTableName(schemaName, tableName),
+                    
TableName.valueOf(loadedTable.getPhysicalName().getBytes()),
+                    getParentPhysicalTableName(loadedTable), 
ptableType,loadedTable.getIndexes());
+
             Region region = env.getRegion();
             MetaDataMutationResult result = checkTableKeyInRegion(key, region);
             if (result != null) {
@@ -1870,7 +1994,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 }
                 Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
                 // Commit the list of deletion.
-                region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> 
emptySet(), HConstants.NO_NONCE,
+                mutateRowsWithLocks(region, tableMetadata, 
Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
                     HConstants.NO_NONCE);
                 long currentTime = 
MetaDataUtil.getClientTimeStamp(tableMetadata);
                 for (ImmutableBytesPtr ckey : invalidateList) {
@@ -1883,7 +2007,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 done.run(MetaDataMutationResult.toProto(result));
                 return;
             } finally {
-                region.releaseRowLocks(locks);
+                releaseRowLocks(region,locks);
             }
         } catch (Throwable t) {
           logger.error("dropTable failed", t);
@@ -1891,6 +2015,24 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 
ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), 
t));
         }
     }
+    
+    protected void releaseRowLocks(Region region, List<RowLock> locks) {
+        if (locks != null) {
+            region.releaseRowLocks(locks);
+        }
+    }
+
+    private RowLock acquireLock(Region region, byte[] lockKey, List<RowLock> 
locks) throws IOException {
+        //LockManager.RowLock rowLock = lockManager.lockRow(lockKey, 
rowLockWaitDuration);
+        RowLock rowLock = region.getRowLock(lockKey, false);
+        if (rowLock == null) {
+            throw new IOException("Failed to acquire lock on " + 
Bytes.toStringBinary(lockKey));
+        }
+        if (locks != null) {
+            locks.add(rowLock);
+        }
+        return rowLock;
+    }
 
     private MetaDataMutationResult doDropTable(byte[] key, byte[] tenantId, 
byte[] schemaName,
         byte[] tableName, byte[] parentTableName, PTableType tableType, 
List<Mutation> rowsToDelete,
@@ -2093,18 +2235,15 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                             EnvironmentEdgeManager.currentTimeMillis(), null);
                 }
                 if (table.getTimeStamp() >= clientTimeStamp) {
-                    logger.info("Found newer table as of " + 
table.getTimeStamp() + " versus client timestamp of " + clientTimeStamp);
+                    logger.info("Found newer table as of " + 
table.getTimeStamp() + " versus client timestamp of "
+                            + clientTimeStamp);
                     return new 
MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
                             EnvironmentEdgeManager.currentTimeMillis(), table);
-                } else if (isTableDeleted(table)) {
-                    return new 
MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
-                            EnvironmentEdgeManager.currentTimeMillis(), null);
-                }
-
-                long expectedSeqNum = 
MetaDataUtil.getSequenceNumber(tableMetadata) - 1; // lookup
-                                                                               
          // TABLE_SEQ_NUM
-                                                                               
          // in
+                } else if (isTableDeleted(table)) { return new 
MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
+                        EnvironmentEdgeManager.currentTimeMillis(), null); }
+                long expectedSeqNum = 
MetaDataUtil.getSequenceNumber(tableMetadata) - 1; // lookup TABLE_SEQ_NUM in
                                                                                
          // tableMetaData
+
                 if (logger.isDebugEnabled()) {
                     logger.debug("For table " + Bytes.toStringBinary(key) + " 
expecting seqNum "
                             + expectedSeqNum + " and found seqNum " + 
table.getSequenceNumber()
@@ -2139,7 +2278,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 if (result != null && 
result.getMutationCode()!=MutationCode.TABLE_ALREADY_EXISTS) {
                     return result;
                 }
-                region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> 
emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
+                mutateRowsWithLocks(region, tableMetadata, 
Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
                 // Invalidate from cache
                 for (ImmutableBytesPtr invalidateKey : invalidateList) {
                     metaDataCache.invalidate(invalidateKey);
@@ -2155,7 +2294,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                     return new 
MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, table);
                 }
             } finally {
-                region.releaseRowLocks(locks);
+                releaseRowLocks(region,locks);
             }
         } catch (Throwable t) {
             ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, 
tableName), t);
@@ -2971,6 +3110,11 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                     PTableType type = table.getType();
                     byte[] tableHeaderRowKey = SchemaUtil.getTableKey(tenantId,
                             schemaName, tableName);
+                    byte[] 
cPhysicalTableName=table.getPhysicalName().getBytes();
+                    
getCoprocessorHost().preAlterTable(Bytes.toString(tenantId),
+                            SchemaUtil.getTableName(schemaName, tableName), 
TableName.valueOf(cPhysicalTableName),
+                            getParentPhysicalTableName(table),type);
+
                     // Size for worst case - all new columns are PK column
                     List<Mutation> mutationsForAddingColumnsToViews = 
Lists.newArrayListWithExpectedSize(tableMetaData.size() * ( 1 + 
table.getIndexes().size()));
                     if (type == PTableType.TABLE || type == PTableType.SYSTEM) 
{
@@ -3124,10 +3268,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
         boolean blockWriteRebuildIndex = 
env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, 
                 QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
         if (!wasLocked) {
-            rowLock = region.getRowLock(key, false);
-            if (rowLock == null) {
-                throw new IOException("Failed to acquire lock on " + 
Bytes.toStringBinary(key));
-            }
+            rowLock = acquireLock(region, key, null);
         }
         try {
             PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
@@ -3184,16 +3325,10 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
          * Lock directly on key, though it may be an index table. This will 
just prevent a table
          * from getting rebuilt too often.
          */
-        List<RowLock> rowLocks = new ArrayList<Region.RowLock>(keys.size());;
+        List<RowLock> rowLocks = new ArrayList<RowLock>(keys.size());;
         try {
-            rowLocks = new ArrayList<Region.RowLock>(keys.size());
             for (int i = 0; i < keys.size(); i++) {
-                Region.RowLock rowLock = region.getRowLock(keys.get(i), false);
-                if (rowLock == null) {
-                    throw new IOException("Failed to acquire lock on "
-                            + Bytes.toStringBinary(keys.get(i)));
-                }
-                rowLocks.add(rowLock);
+                acquireLock(region, keys.get(i), rowLocks);
             }
 
             List<PFunction> functionsAvailable = new 
ArrayList<PFunction>(keys.size());
@@ -3223,10 +3358,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
             if(functionsAvailable.size() == numFunctions) return 
functionsAvailable;
             return null;
         } finally {
-            for (Region.RowLock lock : rowLocks) {
-                lock.release();
-            }
-            rowLocks.clear();
+            releaseRowLocks(region,rowLocks);
         }
     }
 
@@ -3248,6 +3380,11 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                     byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX];
                     byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX];
                     boolean deletePKColumn = false;
+                    
getCoprocessorHost().preAlterTable(Bytes.toString(tenantId),
+                            SchemaUtil.getTableName(schemaName, tableName),
+                            
TableName.valueOf(table.getPhysicalName().getBytes()),
+                            getParentPhysicalTableName(table),table.getType());
+
                     List<Mutation> additionalTableMetaData = 
Lists.newArrayList();
                     
                     PTableType type = table.getType();
@@ -3480,7 +3617,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
             }
             PIndexState newState =
                     
PIndexState.fromSerializedValue(newKV.getValueArray()[newKV.getValueOffset()]);
-            RowLock rowLock = region.getRowLock(key, false);
+            RowLock rowLock = acquireLock(region, key, null);
             if (rowLock == null) {
                 throw new IOException("Failed to acquire lock on " + 
Bytes.toStringBinary(key));
             }
@@ -3502,6 +3639,22 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 Cell currentDisableTimeStamp = 
currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, 
INDEX_DISABLE_TIMESTAMP_BYTES);
                 boolean rowKeyOrderOptimizable = 
currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, 
ROW_KEY_ORDER_OPTIMIZABLE_BYTES) != null;
 
+                //check permission on data table
+                long clientTimeStamp = 
MetaDataUtil.getClientTimeStamp(tableMetadata);
+                PTable loadedTable = loadTable(env, key, new 
ImmutableBytesPtr(key), clientTimeStamp, clientTimeStamp,
+                        request.getClientVersion());
+                if (loadedTable == null) {
+                    
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
+                    
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                    done.run(builder.build());
+                    return;
+                }
+                getCoprocessorHost().preIndexUpdate(Bytes.toString(tenantId),
+                        SchemaUtil.getTableName(schemaName, tableName),
+                        
TableName.valueOf(loadedTable.getPhysicalName().getBytes()),
+                        getParentPhysicalTableName(loadedTable),
+                        newState);
+
                 PIndexState currentState =
                         
PIndexState.fromSerializedValue(currentStateKV.getValueArray()[currentStateKV
                                 .getValueOffset()]);
@@ -3611,7 +3764,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                     if (setRowKeyOrderOptimizableCell) {
                         
UpgradeUtil.addRowKeyOrderOptimizableCell(tableMetadata, key, timeStamp);
                     }
-                    region.mutateRowsWithLocks(tableMetadata, 
Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
+                    mutateRowsWithLocks(region, tableMetadata, 
Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
                         HConstants.NO_NONCE);
                     // Invalidate from cache
                     Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
@@ -3772,6 +3925,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
         long clientTimeStamp = request.getClientTimestamp();
         List<RowLock> locks = Lists.newArrayList();
         try {
+            getCoprocessorHost().preGetSchema(schemaName);
             acquireLock(region, lockKey, locks);
             // Get as of latest timestamp so we can detect if we have a
             // newer schema that already
@@ -3802,7 +3956,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
             done.run(builder.build());
             return;
         } finally {
-            region.releaseRowLocks(locks);
+            releaseRowLocks(region,locks);
         }
     }
 
@@ -3905,7 +4059,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 }
                 // Don't store function info for temporary functions.
                 if(!temporaryFunction) {
-                    region.mutateRowsWithLocks(functionMetaData, 
Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
+                    mutateRowsWithLocks(region, functionMetaData, 
Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
                 }
 
                 // Invalidate the cache - the next getFunction call will add it
@@ -3919,7 +4073,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 done.run(builder.build());
                 return;
             } finally {
-                region.releaseRowLocks(locks);
+                releaseRowLocks(region,locks);
             }
         } catch (Throwable t) {
           logger.error("createFunction failed", t);
@@ -3958,7 +4112,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                     done.run(MetaDataMutationResult.toProto(result));
                     return;
                 }
-                region.mutateRowsWithLocks(functionMetaData, 
Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
+                mutateRowsWithLocks(region, functionMetaData, 
Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
 
                 Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
                 long currentTime = 
MetaDataUtil.getClientTimeStamp(functionMetaData);
@@ -3971,7 +4125,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 done.run(MetaDataMutationResult.toProto(result));
                 return;
             } finally {
-                region.releaseRowLocks(locks);
+                releaseRowLocks(region,locks);
             }
         } catch (Throwable t) {
           logger.error("dropFunction failed", t);
@@ -4068,7 +4222,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                         return;
                     }
                 }
-                region.mutateRowsWithLocks(schemaMutations, 
Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
+                mutateRowsWithLocks(region, schemaMutations, 
Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
                         HConstants.NO_NONCE);
 
                 // Invalidate the cache - the next getSchema call will add it
@@ -4086,7 +4240,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 done.run(builder.build());
                 return;
             } finally {
-                region.releaseRowLocks(locks);
+                releaseRowLocks(region,locks);
             }
         } catch (Throwable t) {
             logger.error("Creating the schema" + schemaName + "failed", t);
@@ -4100,6 +4254,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
         try {
             List<Mutation> schemaMetaData = ProtobufUtil.getMutations(request);
             schemaName = request.getSchemaName();
+            getCoprocessorHost().preDropSchema(schemaName);
             byte[] lockKey = SchemaUtil.getSchemaKey(schemaName);
             Region region = env.getRegion();
             MetaDataMutationResult result = checkSchemaKeyInRegion(lockKey, 
region);
@@ -4117,7 +4272,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                     done.run(MetaDataMutationResult.toProto(result));
                     return;
                 }
-                region.mutateRowsWithLocks(schemaMetaData, 
Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
+                mutateRowsWithLocks(region, schemaMetaData, 
Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
                         HConstants.NO_NONCE);
                 Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = 
GlobalCache.getInstance(this.env)
                         .getMetaDataCache();
@@ -4129,7 +4284,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 done.run(MetaDataMutationResult.toProto(result));
                 return;
             } finally {
-                region.releaseRowLocks(locks);
+                releaseRowLocks(region,locks);
             }
         } catch (Throwable t) {
             logger.error("drop schema failed:", t);
@@ -4175,4 +4330,48 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 null);
 
     }
+    
+    private void mutateRowsWithLocks(final Region region, final List<Mutation> 
mutations, final Set<byte[]> rowsToLock,
+            final long nonceGroup, final long nonce) throws IOException {
+        // we need to mutate SYSTEM.CATALOG with HBase/login user if access is 
enabled.
+        if (this.accessCheckEnabled) {
+            User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws Exception {
+                    final Call rpcContext = RpcUtil.getRpcContext();
+                    // Setting RPC context as null so that user can be resetted
+                    try {
+                        RpcUtil.setRpcContext(null);
+                        region.mutateRowsWithLocks(mutations, rowsToLock, 
nonceGroup, nonce);
+                    } catch (Throwable e) {
+                        throw new IOException(e);
+                    } finally {
+                        // Setting RPC context back to original context of the 
RPC
+                        RpcUtil.setRpcContext(rpcContext);
+                    }
+                    return null;
+                }
+            });
+        } else {
+            region.mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, 
nonce);
+        }
+    }
+    
+    private TableName getParentPhysicalTableName(PTable table) {
+        return table
+                .getType() == PTableType.VIEW
+                        ? TableName.valueOf(table.getPhysicalName().getBytes())
+                        : table.getType() == PTableType.INDEX
+                                ? TableName
+                                        .valueOf(SchemaUtil
+                                                
.getPhysicalHBaseTableName(table.getParentSchemaName(),
+                                                        
table.getParentTableName(), table.isNamespaceMapped())
+                                                .getBytes())
+                                : TableName
+                                        .valueOf(
+                                                SchemaUtil
+                                                        
.getPhysicalHBaseTableName(table.getSchemaName(),
+                                                                
table.getTableName(), table.isNamespaceMapped())
+                                                        .getBytes());
+    }
 }

Reply via email to