http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java index ddc2004..97f3f3d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java @@ -24,6 +24,7 @@ import org.apache.phoenix.schema.TableRef; public interface MutationPlan extends StatementPlan { - public MutationState execute() throws SQLException; - public TableRef getTargetRef(); + MutationState execute() throws SQLException; + TableRef getTargetRef(); + QueryPlan getQueryPlan(); } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java index af2254b..287f9e0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java @@ -307,7 +307,7 @@ public class QueryCompiler { JoinSpec joinSpec = joinSpecs.get(i); context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), query.getUdfParseNodes())); joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder - Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContexts[i], true); + Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContexts[i], JoinCompiler.Strategy.HASH_BUILD_RIGHT); joinExpressions[i] = joinConditions.getFirst(); List<Expression> hashExpressions = joinConditions.getSecond(); Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null); @@ -369,7 +369,7 @@ public class QueryCompiler { context.setCurrentTable(rhsTableRef); context.setResolver(FromCompiler.getResolverForProjectedTable(rhsProjTable, context.getConnection(), rhs.getUdfParseNodes())); ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[] {new ImmutableBytesPtr(emptyByteArray)}; - Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(lhsCtx, context, true); + Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(lhsCtx, context, JoinCompiler.Strategy.HASH_BUILD_LEFT); List<Expression> joinExpressions = joinConditions.getSecond(); List<Expression> hashExpressions = joinConditions.getFirst(); boolean needsMerge = lhsJoin.hasPostReference(); @@ -422,7 +422,7 @@ public class QueryCompiler { QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, true, rhsOrderBy); PTable rhsProjTable = rhsCtx.getResolver().getTables().get(0).getTable(); - Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, type == JoinType.Right ? lhsCtx : rhsCtx, false); + Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, type == JoinType.Right ? lhsCtx : rhsCtx, JoinCompiler.Strategy.SORT_MERGE); List<Expression> lhsKeyExpressions = type == JoinType.Right ? joinConditions.getSecond() : joinConditions.getFirst(); List<Expression> rhsKeyExpressions = type == JoinType.Right ? joinConditions.getFirst() : joinConditions.getSecond(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java index f7cdcbf..ca88984 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java @@ -26,6 +26,7 @@ import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.TableRef; @@ -52,7 +53,9 @@ public interface QueryPlan extends StatementPlan { public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException; public long getEstimatedSize(); - + + public Cost getCost(); + // TODO: change once joins are supported TableRef getTableRef(); /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java index 62e6991..2714858 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java @@ -46,6 +46,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; import org.apache.phoenix.metrics.MetricInfo; +import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.parse.LiteralParseNode; import org.apache.phoenix.parse.ParseNodeFactory; @@ -194,6 +195,11 @@ public class TraceQueryPlan implements QueryPlan { } @Override + public Cost getCost() { + return Cost.ZERO; + } + + @Override public Set<TableRef> getSourceRefs() { return Collections.emptySet(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 9eaaf62..a81a427 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -47,6 +47,7 @@ import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.AggregatePlan; import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.execute.MutationState.MultiRowMutationState; import org.apache.phoenix.execute.MutationState.RowMutationState; import org.apache.phoenix.execute.MutationState.RowTimestampColInfo; import org.apache.phoenix.expression.Determinism; @@ -116,9 +117,10 @@ import com.google.common.collect.Sets; public class UpsertCompiler { private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, - PTable table, Map<ImmutableBytesPtr, RowMutationState> mutation, + PTable table, MultiRowMutationState mutation, PhoenixStatement statement, boolean useServerTimestamp, IndexMaintainer maintainer, byte[][] viewConstants, byte[] onDupKeyBytes, int numSplColumns) throws SQLException { + long columnValueSize = 0; Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length); byte[][] pkValues = new byte[table.getPKColumns().size()][]; // If the table uses salting, the first byte is the salting byte, set to an empty array @@ -148,6 +150,7 @@ public class UpsertCompiler { } } else { columnValues.put(column, value); + columnValueSize += (column.getEstimatedSize() + value.length); } } ImmutableBytesPtr ptr = new ImmutableBytesPtr(); @@ -166,7 +169,7 @@ public class UpsertCompiler { regionPrefix.length)); } } - mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes)); + mutation.put(ptr, new RowMutationState(columnValues, columnValueSize, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes)); } public static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector, @@ -195,7 +198,7 @@ public class UpsertCompiler { } } int rowCount = 0; - Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize); + MultiRowMutationState mutation = new MultiRowMutationState(batchSize); PTable table = tableRef.getTable(); IndexMaintainer indexMaintainer = null; byte[][] viewConstants = null; @@ -695,173 +698,13 @@ public class UpsertCompiler { // Ignore order by - it has no impact final QueryPlan aggPlan = new AggregatePlan(context, select, statementContext.getCurrentTable(), aggProjector, null,null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null); - return new MutationPlan() { - @Override - public ParameterMetaData getParameterMetaData() { - return queryPlan.getContext().getBindManager().getParameterMetaData(); - } - - @Override - public StatementContext getContext() { - return queryPlan.getContext(); - } - - @Override - public TableRef getTargetRef() { - return tableRef; - } - - @Override - public Set<TableRef> getSourceRefs() { - return originalQueryPlan.getSourceRefs(); - } - - @Override - public Operation getOperation() { - return operation; - } - - @Override - public MutationState execute() throws SQLException { - ImmutableBytesWritable ptr = context.getTempPtr(); - PTable table = tableRef.getTable(); - table.getIndexMaintainers(ptr, context.getConnection()); - byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY; - - if (ptr.getLength() > 0) { - byte[] uuidValue = ServerCacheClient.generateId(); - scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get()); - scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); - } - ResultIterator iterator = aggPlan.iterator(); - try { - Tuple row = iterator.next(); - final long mutationCount = (Long)aggProjector.getColumnProjector(0).getValue(row, - PLong.INSTANCE, ptr); - return new MutationState(maxSize, maxSizeBytes, connection) { - @Override - public long getUpdateCount() { - return mutationCount; - } - }; - } finally { - iterator.close(); - } - - } - - @Override - public ExplainPlan getExplainPlan() throws SQLException { - List<String> queryPlanSteps = aggPlan.getExplainPlan().getPlanSteps(); - List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1); - planSteps.add("UPSERT ROWS"); - planSteps.addAll(queryPlanSteps); - return new ExplainPlan(planSteps); - } - - @Override - public Long getEstimatedRowsToScan() throws SQLException { - return aggPlan.getEstimatedRowsToScan(); - } - - @Override - public Long getEstimatedBytesToScan() throws SQLException { - return aggPlan.getEstimatedBytesToScan(); - } - - @Override - public Long getEstimateInfoTimestamp() throws SQLException { - return aggPlan.getEstimateInfoTimestamp(); - } - }; + return new ServerUpsertSelectMutationPlan(queryPlan, tableRef, originalQueryPlan, context, connection, scan, aggPlan, aggProjector, maxSize, maxSizeBytes); } } //////////////////////////////////////////////////////////////////// // UPSERT SELECT run client-side ///////////////////////////////////////////////////////////////////// - return new MutationPlan() { - @Override - public ParameterMetaData getParameterMetaData() { - return queryPlan.getContext().getBindManager().getParameterMetaData(); - } - - @Override - public StatementContext getContext() { - return queryPlan.getContext(); - } - - @Override - public TableRef getTargetRef() { - return tableRef; - } - - @Override - public Set<TableRef> getSourceRefs() { - return originalQueryPlan.getSourceRefs(); - } - - @Override - public Operation getOperation() { - return operation; - } - - @Override - public MutationState execute() throws SQLException { - ResultIterator iterator = queryPlan.iterator(); - if (parallelIteratorFactory == null) { - return upsertSelect(new StatementContext(statement), tableRef, projector, iterator, columnIndexes, pkSlotIndexes, useServerTimestamp, false); - } - try { - parallelIteratorFactory.setRowProjector(projector); - parallelIteratorFactory.setColumnIndexes(columnIndexes); - parallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes); - Tuple tuple; - long totalRowCount = 0; - StatementContext context = queryPlan.getContext(); - while ((tuple=iterator.next()) != null) {// Runs query - Cell kv = tuple.getValue(0); - totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault()); - } - // Return total number of rows that have been updated. In the case of auto commit being off - // the mutations will all be in the mutation state of the current connection. - MutationState mutationState = new MutationState(maxSize, maxSizeBytes, statement.getConnection(), totalRowCount); - /* - * All the metrics collected for measuring the reads done by the parallel mutating iterators - * is included in the ReadMetricHolder of the statement context. Include these metrics in the - * returned mutation state so they can be published on commit. - */ - mutationState.setReadMetricQueue(context.getReadMetricsQueue()); - return mutationState; - } finally { - iterator.close(); - } - } - - @Override - public ExplainPlan getExplainPlan() throws SQLException { - List<String> queryPlanSteps = queryPlan.getExplainPlan().getPlanSteps(); - List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1); - planSteps.add("UPSERT SELECT"); - planSteps.addAll(queryPlanSteps); - return new ExplainPlan(planSteps); - } - - @Override - public Long getEstimatedRowsToScan() throws SQLException { - return queryPlan.getEstimatedRowsToScan(); - } - - @Override - public Long getEstimatedBytesToScan() throws SQLException { - return queryPlan.getEstimatedBytesToScan(); - } - - @Override - public Long getEstimateInfoTimestamp() throws SQLException { - return queryPlan.getEstimateInfoTimestamp(); - } - }; + return new ClientUpsertSelectMutationPlan(queryPlan, tableRef, originalQueryPlan, parallelIteratorFactory, projector, columnIndexes, pkSlotIndexes, useServerTimestamp, maxSize, maxSizeBytes); } @@ -987,124 +830,9 @@ public class UpsertCompiler { } final byte[] onDupKeyBytes = onDupKeyBytesToBe; - return new MutationPlan() { - @Override - public ParameterMetaData getParameterMetaData() { - return context.getBindManager().getParameterMetaData(); - } - - @Override - public StatementContext getContext() { - return context; - } - - @Override - public TableRef getTargetRef() { - return tableRef; - } - - @Override - public Set<TableRef> getSourceRefs() { - return Collections.emptySet(); - } - - @Override - public Operation getOperation() { - return operation; - } - - @Override - public MutationState execute() throws SQLException { - ImmutableBytesWritable ptr = context.getTempPtr(); - final SequenceManager sequenceManager = context.getSequenceManager(); - // Next evaluate all the expressions - int nodeIndex = nodeIndexOffset; - PTable table = tableRef.getTable(); - Tuple tuple = sequenceManager.getSequenceCount() == 0 ? null : - sequenceManager.newSequenceTuple(null); - for (Expression constantExpression : constantExpressions) { - PColumn column = allColumns.get(columnIndexes[nodeIndex]); - constantExpression.evaluate(tuple, ptr); - Object value = null; - if (constantExpression.getDataType() != null) { - value = constantExpression.getDataType().toObject(ptr, constantExpression.getSortOrder(), constantExpression.getMaxLength(), constantExpression.getScale()); - if (!constantExpression.getDataType().isCoercibleTo(column.getDataType(), value)) { - throw TypeMismatchException.newException( - constantExpression.getDataType(), column.getDataType(), "expression: " - + constantExpression.toString() + " in column " + column); - } - if (!column.getDataType().isSizeCompatible(ptr, value, constantExpression.getDataType(), - constantExpression.getSortOrder(), constantExpression.getMaxLength(), - constantExpression.getScale(), column.getMaxLength(), column.getScale())) { - throw new SQLExceptionInfo.Builder( - SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString()) - .setMessage("value=" + constantExpression.toString()).build().buildException(); - } - } - column.getDataType().coerceBytes(ptr, value, constantExpression.getDataType(), - constantExpression.getMaxLength(), constantExpression.getScale(), constantExpression.getSortOrder(), - column.getMaxLength(), column.getScale(),column.getSortOrder(), - table.rowKeyOrderOptimizable()); - if (overlapViewColumns.contains(column) && Bytes.compareTo(ptr.get(), ptr.getOffset(), ptr.getLength(), column.getViewConstant(), 0, column.getViewConstant().length-1) != 0) { - throw new SQLExceptionInfo.Builder( - SQLExceptionCode.CANNOT_UPDATE_VIEW_COLUMN) - .setColumnName(column.getName().getString()) - .setMessage("value=" + constantExpression.toString()).build().buildException(); - } - values[nodeIndex] = ByteUtil.copyKeyBytesIfNecessary(ptr); - nodeIndex++; - } - // Add columns based on view - for (PColumn column : addViewColumns) { - if (IndexUtil.getViewConstantValue(column, ptr)) { - values[nodeIndex++] = ByteUtil.copyKeyBytesIfNecessary(ptr); - } else { - throw new IllegalStateException(); - } - } - Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(1); - IndexMaintainer indexMaintainer = null; - byte[][] viewConstants = null; - if (table.getIndexType() == IndexType.LOCAL) { - PTable parentTable = - statement - .getConnection() - .getMetaDataCache() - .getTableRef( - new PTableKey(statement.getConnection().getTenantId(), - table.getParentName().getString())).getTable(); - indexMaintainer = table.getIndexMaintainer(parentTable, connection); - viewConstants = IndexUtil.getViewConstants(parentTable); - } - setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes, 0); - return new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes, connection); - } - - @Override - public ExplainPlan getExplainPlan() throws SQLException { - List<String> planSteps = Lists.newArrayListWithExpectedSize(2); - if (context.getSequenceManager().getSequenceCount() > 0) { - planSteps.add("CLIENT RESERVE " + context.getSequenceManager().getSequenceCount() + " SEQUENCES"); - } - planSteps.add("PUT SINGLE ROW"); - return new ExplainPlan(planSteps); - } - - @Override - public Long getEstimatedRowsToScan() throws SQLException { - return 0l; - } - - @Override - public Long getEstimatedBytesToScan() throws SQLException { - return 0l; - } - - @Override - public Long getEstimateInfoTimestamp() throws SQLException { - return 0l; - } - }; + return new UpsertValuesMutationPlan(context, tableRef, nodeIndexOffset, constantExpressions, + allColumns, columnIndexes, overlapViewColumns, values, addViewColumns, + connection, pkSlotIndexes, useServerTimestamp, onDupKeyBytes, maxSize, maxSizeBytes); } private static boolean isRowTimestampSet(int[] pkSlotIndexes, PTable table) { @@ -1215,4 +943,394 @@ public class UpsertCompiler { } } } + + private class ServerUpsertSelectMutationPlan implements MutationPlan { + private final QueryPlan queryPlan; + private final TableRef tableRef; + private final QueryPlan originalQueryPlan; + private final StatementContext context; + private final PhoenixConnection connection; + private final Scan scan; + private final QueryPlan aggPlan; + private final RowProjector aggProjector; + private final int maxSize; + private final int maxSizeBytes; + + public ServerUpsertSelectMutationPlan(QueryPlan queryPlan, TableRef tableRef, QueryPlan originalQueryPlan, + StatementContext context, PhoenixConnection connection, + Scan scan, QueryPlan aggPlan, RowProjector aggProjector, + int maxSize, int maxSizeBytes) { + this.queryPlan = queryPlan; + this.tableRef = tableRef; + this.originalQueryPlan = originalQueryPlan; + this.context = context; + this.connection = connection; + this.scan = scan; + this.aggPlan = aggPlan; + this.aggProjector = aggProjector; + this.maxSize = maxSize; + this.maxSizeBytes = maxSizeBytes; + } + + @Override + public ParameterMetaData getParameterMetaData() { + return queryPlan.getContext().getBindManager().getParameterMetaData(); + } + + @Override + public StatementContext getContext() { + return queryPlan.getContext(); + } + + @Override + public TableRef getTargetRef() { + return tableRef; + } + + @Override + public QueryPlan getQueryPlan() { + return aggPlan; + } + + @Override + public Set<TableRef> getSourceRefs() { + return originalQueryPlan.getSourceRefs(); + } + + @Override + public Operation getOperation() { + return operation; + } + + @Override + public MutationState execute() throws SQLException { + ImmutableBytesWritable ptr = context.getTempPtr(); + PTable table = tableRef.getTable(); + table.getIndexMaintainers(ptr, context.getConnection()); + byte[] txState = table.isTransactional() ? + connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY; + + if (ptr.getLength() > 0) { + byte[] uuidValue = ServerCacheClient.generateId(); + scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); + scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get()); + scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); + } + ResultIterator iterator = aggPlan.iterator(); + try { + Tuple row = iterator.next(); + final long mutationCount = (Long) aggProjector.getColumnProjector(0).getValue(row, + PLong.INSTANCE, ptr); + return new MutationState(maxSize, maxSizeBytes, connection) { + @Override + public long getUpdateCount() { + return mutationCount; + } + }; + } finally { + iterator.close(); + } + + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + List<String> queryPlanSteps = aggPlan.getExplainPlan().getPlanSteps(); + List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1); + planSteps.add("UPSERT ROWS"); + planSteps.addAll(queryPlanSteps); + return new ExplainPlan(planSteps); + } + + @Override + public Long getEstimatedRowsToScan() throws SQLException { + return aggPlan.getEstimatedRowsToScan(); + } + + @Override + public Long getEstimatedBytesToScan() throws SQLException { + return aggPlan.getEstimatedBytesToScan(); + } + + @Override + public Long getEstimateInfoTimestamp() throws SQLException { + return aggPlan.getEstimateInfoTimestamp(); + } + } + + private class UpsertValuesMutationPlan implements MutationPlan { + private final StatementContext context; + private final TableRef tableRef; + private final int nodeIndexOffset; + private final List<Expression> constantExpressions; + private final List<PColumn> allColumns; + private final int[] columnIndexes; + private final Set<PColumn> overlapViewColumns; + private final byte[][] values; + private final Set<PColumn> addViewColumns; + private final PhoenixConnection connection; + private final int[] pkSlotIndexes; + private final boolean useServerTimestamp; + private final byte[] onDupKeyBytes; + private final int maxSize; + private final int maxSizeBytes; + + public UpsertValuesMutationPlan(StatementContext context, TableRef tableRef, int nodeIndexOffset, + List<Expression> constantExpressions, List<PColumn> allColumns, + int[] columnIndexes, Set<PColumn> overlapViewColumns, byte[][] values, + Set<PColumn> addViewColumns, PhoenixConnection connection, + int[] pkSlotIndexes, boolean useServerTimestamp, byte[] onDupKeyBytes, + int maxSize, int maxSizeBytes) { + this.context = context; + this.tableRef = tableRef; + this.nodeIndexOffset = nodeIndexOffset; + this.constantExpressions = constantExpressions; + this.allColumns = allColumns; + this.columnIndexes = columnIndexes; + this.overlapViewColumns = overlapViewColumns; + this.values = values; + this.addViewColumns = addViewColumns; + this.connection = connection; + this.pkSlotIndexes = pkSlotIndexes; + this.useServerTimestamp = useServerTimestamp; + this.onDupKeyBytes = onDupKeyBytes; + this.maxSize = maxSize; + this.maxSizeBytes = maxSizeBytes; + } + + @Override + public ParameterMetaData getParameterMetaData() { + return context.getBindManager().getParameterMetaData(); + } + + @Override + public StatementContext getContext() { + return context; + } + + @Override + public TableRef getTargetRef() { + return tableRef; + } + + @Override + public QueryPlan getQueryPlan() { + return null; + } + + @Override + public Set<TableRef> getSourceRefs() { + return Collections.emptySet(); + } + + @Override + public Operation getOperation() { + return operation; + } + + @Override + public MutationState execute() throws SQLException { + ImmutableBytesWritable ptr = context.getTempPtr(); + final SequenceManager sequenceManager = context.getSequenceManager(); + // Next evaluate all the expressions + int nodeIndex = nodeIndexOffset; + PTable table = tableRef.getTable(); + Tuple tuple = sequenceManager.getSequenceCount() == 0 ? null : + sequenceManager.newSequenceTuple(null); + for (Expression constantExpression : constantExpressions) { + PColumn column = allColumns.get(columnIndexes[nodeIndex]); + constantExpression.evaluate(tuple, ptr); + Object value = null; + if (constantExpression.getDataType() != null) { + value = constantExpression.getDataType().toObject(ptr, constantExpression.getSortOrder(), + constantExpression.getMaxLength(), constantExpression.getScale()); + if (!constantExpression.getDataType().isCoercibleTo(column.getDataType(), value)) { + throw TypeMismatchException.newException( + constantExpression.getDataType(), column.getDataType(), "expression: " + + constantExpression.toString() + " in column " + column); + } + if (!column.getDataType().isSizeCompatible(ptr, value, constantExpression.getDataType(), + constantExpression.getSortOrder(), constantExpression.getMaxLength(), + constantExpression.getScale(), column.getMaxLength(), column.getScale())) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString()) + .setMessage("value=" + constantExpression.toString()).build().buildException(); + } + } + column.getDataType().coerceBytes(ptr, value, constantExpression.getDataType(), + constantExpression.getMaxLength(), constantExpression.getScale(), constantExpression.getSortOrder(), + column.getMaxLength(), column.getScale(),column.getSortOrder(), + table.rowKeyOrderOptimizable()); + if (overlapViewColumns.contains(column) && Bytes.compareTo(ptr.get(), ptr.getOffset(), ptr.getLength(), column.getViewConstant(), 0, column.getViewConstant().length-1) != 0) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.CANNOT_UPDATE_VIEW_COLUMN) + .setColumnName(column.getName().getString()) + .setMessage("value=" + constantExpression.toString()).build().buildException(); + } + values[nodeIndex] = ByteUtil.copyKeyBytesIfNecessary(ptr); + nodeIndex++; + } + // Add columns based on view + for (PColumn column : addViewColumns) { + if (IndexUtil.getViewConstantValue(column, ptr)) { + values[nodeIndex++] = ByteUtil.copyKeyBytesIfNecessary(ptr); + } else { + throw new IllegalStateException(); + } + } + MultiRowMutationState mutation = new MultiRowMutationState(1); + IndexMaintainer indexMaintainer = null; + byte[][] viewConstants = null; + if (table.getIndexType() == IndexType.LOCAL) { + PTable parentTable = + statement + .getConnection() + .getMetaDataCache() + .getTableRef( + new PTableKey(statement.getConnection().getTenantId(), + table.getParentName().getString())).getTable(); + indexMaintainer = table.getIndexMaintainer(parentTable, connection); + viewConstants = IndexUtil.getViewConstants(parentTable); + } + setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes, 0); + return new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes, connection); + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + List<String> planSteps = Lists.newArrayListWithExpectedSize(2); + if (context.getSequenceManager().getSequenceCount() > 0) { + planSteps.add("CLIENT RESERVE " + context.getSequenceManager().getSequenceCount() + " SEQUENCES"); + } + planSteps.add("PUT SINGLE ROW"); + return new ExplainPlan(planSteps); + } + + @Override + public Long getEstimatedRowsToScan() throws SQLException { + return 0l; + } + + @Override + public Long getEstimatedBytesToScan() throws SQLException { + return 0l; + } + + @Override + public Long getEstimateInfoTimestamp() throws SQLException { + return 0l; + } + } + + private class ClientUpsertSelectMutationPlan implements MutationPlan { + private final QueryPlan queryPlan; + private final TableRef tableRef; + private final QueryPlan originalQueryPlan; + private final UpsertingParallelIteratorFactory parallelIteratorFactory; + private final RowProjector projector; + private final int[] columnIndexes; + private final int[] pkSlotIndexes; + private final boolean useServerTimestamp; + private final int maxSize; + private final int maxSizeBytes; + + public ClientUpsertSelectMutationPlan(QueryPlan queryPlan, TableRef tableRef, QueryPlan originalQueryPlan, UpsertingParallelIteratorFactory parallelIteratorFactory, RowProjector projector, int[] columnIndexes, int[] pkSlotIndexes, boolean useServerTimestamp, int maxSize, int maxSizeBytes) { + this.queryPlan = queryPlan; + this.tableRef = tableRef; + this.originalQueryPlan = originalQueryPlan; + this.parallelIteratorFactory = parallelIteratorFactory; + this.projector = projector; + this.columnIndexes = columnIndexes; + this.pkSlotIndexes = pkSlotIndexes; + this.useServerTimestamp = useServerTimestamp; + this.maxSize = maxSize; + this.maxSizeBytes = maxSizeBytes; + } + + @Override + public ParameterMetaData getParameterMetaData() { + return queryPlan.getContext().getBindManager().getParameterMetaData(); + } + + @Override + public StatementContext getContext() { + return queryPlan.getContext(); + } + + @Override + public TableRef getTargetRef() { + return tableRef; + } + + @Override + public QueryPlan getQueryPlan() { + return queryPlan; + } + + @Override + public Set<TableRef> getSourceRefs() { + return originalQueryPlan.getSourceRefs(); + } + + @Override + public Operation getOperation() { + return operation; + } + + @Override + public MutationState execute() throws SQLException { + ResultIterator iterator = queryPlan.iterator(); + if (parallelIteratorFactory == null) { + return upsertSelect(new StatementContext(statement), tableRef, projector, iterator, columnIndexes, pkSlotIndexes, useServerTimestamp, false); + } + try { + parallelIteratorFactory.setRowProjector(projector); + parallelIteratorFactory.setColumnIndexes(columnIndexes); + parallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes); + Tuple tuple; + long totalRowCount = 0; + StatementContext context = queryPlan.getContext(); + while ((tuple=iterator.next()) != null) {// Runs query + Cell kv = tuple.getValue(0); + totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault()); + } + // Return total number of rows that have been updated. In the case of auto commit being off + // the mutations will all be in the mutation state of the current connection. + MutationState mutationState = new MutationState(maxSize, maxSizeBytes, statement.getConnection(), totalRowCount); + /* + * All the metrics collected for measuring the reads done by the parallel mutating iterators + * is included in the ReadMetricHolder of the statement context. Include these metrics in the + * returned mutation state so they can be published on commit. + */ + mutationState.setReadMetricQueue(context.getReadMetricsQueue()); + return mutationState; + } finally { + iterator.close(); + } + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + List<String> queryPlanSteps = queryPlan.getExplainPlan().getPlanSteps(); + List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1); + planSteps.add("UPSERT SELECT"); + planSteps.addAll(queryPlanSteps); + return new ExplainPlan(planSteps); + } + + @Override + public Long getEstimatedRowsToScan() throws SQLException { + return queryPlan.getEstimatedRowsToScan(); + } + + @Override + public Long getEstimatedBytesToScan() throws SQLException { + return queryPlan.getEstimatedBytesToScan(); + } + + @Override + public Long getEstimateInfoTimestamp() throws SQLException { + return queryPlan.getEstimateInfoTimestamp(); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java index 5e7b996..7bf8259 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java @@ -326,11 +326,6 @@ public class WhereOptimizer { PTable table = context.getCurrentTable().getTable(); for (int i = 0; i < expressions.size(); i++) { Expression expression = expressions.get(i); - // TODO this is a temporary fix for PHOENIX-3029. - if (expression instanceof CoerceExpression - && expression.getSortOrder() != expression.getChildren().get(0).getSortOrder()) { - continue; - } KeyExpressionVisitor visitor = new KeyExpressionVisitor(context, table); KeyExpressionVisitor.KeySlots keySlots = expression.accept(visitor); int minPkPos = Integer.MAX_VALUE; http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseMetaDataEndpointObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseMetaDataEndpointObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseMetaDataEndpointObserver.java new file mode 100644 index 0000000..8decc8c --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseMetaDataEndpointObserver.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.phoenix.coprocessor.PhoenixMetaDataCoprocessorHost.PhoenixMetaDataControllerEnvironment; +import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableType; + +public class BaseMetaDataEndpointObserver implements MetaDataEndpointObserver{ + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + + } + + @Override + public void stop(CoprocessorEnvironment env) throws IOException { + + } + + @Override + public void preGetTable( + org.apache.hadoop.hbase.coprocessor.ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, + String tenantId, String tableName, TableName physicalTableName) throws IOException { + + } + + + @Override + public void preCreateTable(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId, + String tableName, TableName physicalTableName, TableName parentPhysicalTableName, PTableType tableType, + Set<byte[]> familySet, Set<TableName> indexes) throws IOException { + + } + + @Override + public void preDropTable(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId, + String tableName, TableName physicalTableName, TableName parentPhysicalTableName, PTableType tableType, + List<PTable> indexes) throws IOException { + + } + + @Override + public void preAlterTable(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId, + String tableName, TableName physicalTableName, TableName parentPhysicalTableName, PTableType type) throws IOException { + + } + + @Override + public void preGetSchema(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String schemaName) + throws IOException { + + } + + @Override + public void preCreateSchema(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String schemaName) + throws IOException { + + } + + @Override + public void preDropSchema(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String schemaName) throws IOException { + + } + + @Override + public void preCreateFunction(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId, + String functionName) throws IOException { + + } + + @Override + public void preDropFunction(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId, String functionName) + throws IOException {} + + @Override + public void preGetFunctions(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId, String functionName) + throws IOException { + + } + + @Override + public void preIndexUpdate(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId, + String indexName, TableName physicalTableName, TableName parentPhysicalTableName, PIndexState newState) + throws IOException { + + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index d05ab79..afbd63f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -84,6 +84,7 @@ import static org.apache.phoenix.util.SchemaUtil.getVarCharLength; import static org.apache.phoenix.util.SchemaUtil.getVarChars; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; @@ -91,10 +92,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -105,6 +108,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTableInterface; @@ -121,9 +125,12 @@ import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.ipc.RpcServer.Call; +import org.apache.hadoop.hbase.ipc.RpcUtil; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region.RowLock; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.VersionInfo; @@ -452,7 +459,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private static final int DEFAULT_VALUE_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(DEFAULT_VALUE_KV); private static final int MIN_VALUE_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(MIN_VALUE_KV); private static final int MAX_VALUE_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(MAX_VALUE_KV); - + private static PName newPName(byte[] keyBuffer, int keyOffset, int keyLength) { if (keyLength <= 0) { return null; @@ -463,6 +470,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private RegionCoprocessorEnvironment env; + private PhoenixMetaDataCoprocessorHost phoenixAccessCoprocessorHost; + private boolean accessCheckEnabled; + /** * Stores a reference to the coprocessor environment provided by the * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this @@ -480,6 +490,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } else { throw new CoprocessorException("Must be loaded on a table region!"); } + + phoenixAccessCoprocessorHost = new PhoenixMetaDataCoprocessorHost(this.env); + this.accessCheckEnabled = env.getConfiguration().getBoolean(QueryServices.PHOENIX_ACLS_ENABLED, + QueryServicesOptions.DEFAULT_PHOENIX_ACLS_ENABLED); logger.info("Starting Tracing-Metrics Systems"); // Start the phoenix trace collection Tracing.addTraceMetricsSource(); @@ -523,6 +537,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso done.run(builder.build()); return; } + getCoprocessorHost().preGetTable(Bytes.toString(tenantId), SchemaUtil.getTableName(schemaName, tableName), + TableName.valueOf(table.getPhysicalName().getBytes())); + builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS); long disableIndexTimestamp = table.getIndexDisableTimestamp(); long minNonZerodisableIndexTimestamp = disableIndexTimestamp > 0 ? disableIndexTimestamp : Long.MAX_VALUE; @@ -554,6 +571,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } } + private PhoenixMetaDataCoprocessorHost getCoprocessorHost() { + return phoenixAccessCoprocessorHost; + } + private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, Region region, long clientTimeStamp, int clientVersion) throws IOException, SQLException { Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp); @@ -1317,12 +1338,14 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso * @return null if the physical table row information is not present. * */ - private static Mutation getPhysicalTableForView(List<Mutation> tableMetadata, byte[][] parentSchemaTableNames) { + private static Mutation getPhysicalTableRowForView(List<Mutation> tableMetadata, byte[][] parentTenantSchemaTableNames, byte[][] physicalSchemaTableNames) { int size = tableMetadata.size(); byte[][] rowKeyMetaData = new byte[3][]; MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData); Mutation physicalTableRow = null; + Mutation parentTableRow = null; boolean physicalTableLinkFound = false; + boolean parentTableLinkFound = false; if (size >= 2) { int i = size - 1; while (i >= 1) { @@ -1332,28 +1355,51 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso if (linkType == LinkType.PHYSICAL_TABLE) { physicalTableRow = m; physicalTableLinkFound = true; - break; } + if (linkType == LinkType.PARENT_TABLE) { + parentTableRow=m; + parentTableLinkFound = true; + } + } + if(physicalTableLinkFound && parentTableLinkFound){ + break; } i--; } } + if (!parentTableLinkFound) { + parentTenantSchemaTableNames[0] = null; + parentTenantSchemaTableNames[1] = null; + parentTenantSchemaTableNames[2] = null; + + } if (!physicalTableLinkFound) { - parentSchemaTableNames[0] = null; - parentSchemaTableNames[1] = null; - return null; + physicalSchemaTableNames[0] = null; + physicalSchemaTableNames[1] = null; + physicalSchemaTableNames[2] = null; + } + if (physicalTableLinkFound) { + getSchemaTableNames(physicalTableRow,physicalSchemaTableNames); + } + if (parentTableLinkFound) { + getSchemaTableNames(parentTableRow,parentTenantSchemaTableNames); } - rowKeyMetaData = new byte[5][]; - getVarChars(physicalTableRow.getRow(), 5, rowKeyMetaData); + return physicalTableRow; + } + + private static void getSchemaTableNames(Mutation row, byte[][] schemaTableNames) { + byte[][] rowKeyMetaData = new byte[5][]; + getVarChars(row.getRow(), 5, rowKeyMetaData); + byte[] tenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX]; byte[] colBytes = rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]; byte[] famBytes = rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]; if ((colBytes == null || colBytes.length == 0) && (famBytes != null && famBytes.length > 0)) { byte[] sName = SchemaUtil.getSchemaNameFromFullName(famBytes).getBytes(); byte[] tName = SchemaUtil.getTableNameFromFullName(famBytes).getBytes(); - parentSchemaTableNames[0] = sName; - parentSchemaTableNames[1] = tName; + schemaTableNames[0]= tenantId; + schemaTableNames[1] = sName; + schemaTableNames[2] = tName; } - return physicalTableRow; } @Override @@ -1370,25 +1416,76 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX]; schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX]; tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]; - + boolean isNamespaceMapped = MetaDataUtil.isNameSpaceMapped(tableMetadata, GenericKeyValueBuilder.INSTANCE, + new ImmutableBytesWritable()); + final IndexType indexType = MetaDataUtil.getIndexType(tableMetadata, GenericKeyValueBuilder.INSTANCE, + new ImmutableBytesWritable()); byte[] parentSchemaName = null; byte[] parentTableName = null; PTableType tableType = MetaDataUtil.getTableType(tableMetadata, GenericKeyValueBuilder.INSTANCE, new ImmutableBytesWritable()); byte[] parentTableKey = null; Mutation viewPhysicalTableRow = null; + Set<TableName> indexes = new HashSet<TableName>();; + byte[] cPhysicalName = SchemaUtil.getPhysicalHBaseTableName(schemaName, tableName, isNamespaceMapped) + .getBytes(); + byte[] cParentPhysicalName=null; if (tableType == PTableType.VIEW) { - byte[][] parentSchemaTableNames = new byte[2][]; + byte[][] parentSchemaTableNames = new byte[3][]; + byte[][] parentPhysicalSchemaTableNames = new byte[3][]; /* * For a view, we lock the base physical table row. For a mapped view, there is * no link present to the physical table. So the viewPhysicalTableRow is null * in that case. */ - viewPhysicalTableRow = getPhysicalTableForView(tableMetadata, parentSchemaTableNames); - parentSchemaName = parentSchemaTableNames[0]; - parentTableName = parentSchemaTableNames[1]; - if (parentTableName != null) { - parentTableKey = SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY, parentSchemaName, parentTableName); + + viewPhysicalTableRow = getPhysicalTableRowForView(tableMetadata, parentSchemaTableNames,parentPhysicalSchemaTableNames); + long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata); + if (parentPhysicalSchemaTableNames[2] != null) { + + parentTableKey = SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY, + parentPhysicalSchemaTableNames[1], parentPhysicalSchemaTableNames[2]); + PTable parentTable = loadTable(env, parentTableKey, new ImmutableBytesPtr(parentTableKey), + clientTimeStamp, clientTimeStamp, clientVersion); + if (parentTable == null) { + builder.setReturnCode(MetaDataProtos.MutationCode.PARENT_TABLE_NOT_FOUND); + builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis()); + done.run(builder.build()); + return; + } + cParentPhysicalName = parentTable.getPhysicalName().getBytes(); + if (parentSchemaTableNames[2] != null + && Bytes.compareTo(parentSchemaTableNames[2], parentPhysicalSchemaTableNames[2]) != 0) { + // if view is created on view + byte[] parentKey = SchemaUtil.getTableKey( + parentSchemaTableNames[0] == null ? ByteUtil.EMPTY_BYTE_ARRAY : parentSchemaTableNames[0], + parentSchemaTableNames[1], parentSchemaTableNames[2]); + parentTable = loadTable(env, parentKey, new ImmutableBytesPtr(parentKey), + clientTimeStamp, clientTimeStamp, clientVersion); + if (parentTable == null) { + // it could be a global view + parentKey = SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY, + parentSchemaTableNames[1], parentSchemaTableNames[2]); + parentTable = loadTable(env, parentKey, new ImmutableBytesPtr(parentKey), + clientTimeStamp, clientTimeStamp, clientVersion); + } + } + if (parentTable == null) { + builder.setReturnCode(MetaDataProtos.MutationCode.PARENT_TABLE_NOT_FOUND); + builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis()); + done.run(builder.build()); + return; + } + for (PTable index : parentTable.getIndexes()) { + indexes.add(TableName.valueOf(index.getPhysicalName().getBytes())); + } + + } else { + // Mapped View + cParentPhysicalName = SchemaUtil.getTableNameAsBytes(schemaName, tableName); } + parentSchemaName = parentPhysicalSchemaTableNames[1]; + parentTableName = parentPhysicalSchemaTableNames[2]; + } else if (tableType == PTableType.INDEX) { parentSchemaName = schemaName; /* @@ -1398,7 +1495,27 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso */ parentTableName = MetaDataUtil.getParentTableName(tableMetadata); parentTableKey = SchemaUtil.getTableKey(tenantIdBytes, parentSchemaName, parentTableName); + long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata); + PTable parentTable = loadTable(env, parentTableKey, new ImmutableBytesPtr(parentTableKey), + clientTimeStamp, clientTimeStamp, clientVersion); + if (IndexType.LOCAL == indexType) { + cPhysicalName = parentTable.getPhysicalName().getBytes(); + cParentPhysicalName=parentTable.getPhysicalName().getBytes(); + } else if (parentTable.getType() == PTableType.VIEW) { + cPhysicalName = MetaDataUtil.getViewIndexPhysicalName(parentTable.getPhysicalName().getBytes()); + cParentPhysicalName = parentTable.getPhysicalName().getBytes(); + }else{ + cParentPhysicalName = SchemaUtil + .getPhysicalHBaseTableName(parentSchemaName, parentTableName, isNamespaceMapped).getBytes(); + } } + + getCoprocessorHost().preCreateTable(Bytes.toString(tenantIdBytes), + SchemaUtil.getTableName(schemaName, tableName), + (tableType == PTableType.VIEW) ? null : TableName.valueOf(cPhysicalName), + cParentPhysicalName == null ? null : TableName.valueOf(cParentPhysicalName), tableType, + /* TODO: During inital create we may not need the family map */ + Collections.<byte[]> emptySet(), indexes); Region region = env.getRegion(); List<RowLock> locks = Lists.newArrayList(); @@ -1613,7 +1730,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // primary and then index table locks are held, in that order). For now, we just don't support // indexing on the system table. This is an issue because of the way we manage batch mutation // in the Indexer. - region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE); + mutateRowsWithLocks(region, tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE); // Invalidate the cache - the next getTable call will add it // TODO: consider loading the table that was just created here, patching up the parent table, and updating the cache @@ -1632,7 +1749,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso done.run(builder.build()); return; } finally { - region.releaseRowLocks(locks); + releaseRowLocks(region,locks); } } catch (Throwable t) { logger.error("createTable failed", t); @@ -1648,16 +1765,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso QueryServicesOptions.DEFAULT_MAX_INDEXES_PER_TABLE); } - private static RowLock acquireLock(Region region, byte[] key, List<RowLock> locks) - throws IOException { - RowLock rowLock = region.getRowLock(key, false); - if (rowLock == null) { - throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key)); - } - locks.add(rowLock); - return rowLock; - } - private static final byte[] CHILD_TABLE_BYTES = new byte[] {PTable.LinkType.CHILD_TABLE.getSerializedValue()}; @@ -1846,6 +1953,23 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso parentTableName == null ? lockKey : SchemaUtil.getTableKey(tenantIdBytes, schemaName, tableName); + + PTableType ptableType=PTableType.fromSerializedValue(tableType); + long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata); + byte[] cKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, tableName); + PTable loadedTable = loadTable(env, cKey, new ImmutableBytesPtr(cKey), clientTimeStamp, clientTimeStamp, + request.getClientVersion()); + if (loadedTable == null) { + builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND); + builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis()); + done.run(builder.build()); + return; + } + getCoprocessorHost().preDropTable(Bytes.toString(tenantIdBytes), + SchemaUtil.getTableName(schemaName, tableName), + TableName.valueOf(loadedTable.getPhysicalName().getBytes()), + getParentPhysicalTableName(loadedTable), ptableType,loadedTable.getIndexes()); + Region region = env.getRegion(); MetaDataMutationResult result = checkTableKeyInRegion(key, region); if (result != null) { @@ -1870,7 +1994,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); // Commit the list of deletion. - region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, + mutateRowsWithLocks(region, tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE); long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata); for (ImmutableBytesPtr ckey : invalidateList) { @@ -1883,7 +2007,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso done.run(MetaDataMutationResult.toProto(result)); return; } finally { - region.releaseRowLocks(locks); + releaseRowLocks(region,locks); } } catch (Throwable t) { logger.error("dropTable failed", t); @@ -1891,6 +2015,24 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t)); } } + + protected void releaseRowLocks(Region region, List<RowLock> locks) { + if (locks != null) { + region.releaseRowLocks(locks); + } + } + + private RowLock acquireLock(Region region, byte[] lockKey, List<RowLock> locks) throws IOException { + //LockManager.RowLock rowLock = lockManager.lockRow(lockKey, rowLockWaitDuration); + RowLock rowLock = region.getRowLock(lockKey, false); + if (rowLock == null) { + throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(lockKey)); + } + if (locks != null) { + locks.add(rowLock); + } + return rowLock; + } private MetaDataMutationResult doDropTable(byte[] key, byte[] tenantId, byte[] schemaName, byte[] tableName, byte[] parentTableName, PTableType tableType, List<Mutation> rowsToDelete, @@ -2093,18 +2235,15 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso EnvironmentEdgeManager.currentTimeMillis(), null); } if (table.getTimeStamp() >= clientTimeStamp) { - logger.info("Found newer table as of " + table.getTimeStamp() + " versus client timestamp of " + clientTimeStamp); + logger.info("Found newer table as of " + table.getTimeStamp() + " versus client timestamp of " + + clientTimeStamp); return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND, EnvironmentEdgeManager.currentTimeMillis(), table); - } else if (isTableDeleted(table)) { - return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, - EnvironmentEdgeManager.currentTimeMillis(), null); - } - - long expectedSeqNum = MetaDataUtil.getSequenceNumber(tableMetadata) - 1; // lookup - // TABLE_SEQ_NUM - // in + } else if (isTableDeleted(table)) { return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, + EnvironmentEdgeManager.currentTimeMillis(), null); } + long expectedSeqNum = MetaDataUtil.getSequenceNumber(tableMetadata) - 1; // lookup TABLE_SEQ_NUM in // tableMetaData + if (logger.isDebugEnabled()) { logger.debug("For table " + Bytes.toStringBinary(key) + " expecting seqNum " + expectedSeqNum + " and found seqNum " + table.getSequenceNumber() @@ -2139,7 +2278,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso if (result != null && result.getMutationCode()!=MutationCode.TABLE_ALREADY_EXISTS) { return result; } - region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE); + mutateRowsWithLocks(region, tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE); // Invalidate from cache for (ImmutableBytesPtr invalidateKey : invalidateList) { metaDataCache.invalidate(invalidateKey); @@ -2155,7 +2294,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, table); } } finally { - region.releaseRowLocks(locks); + releaseRowLocks(region,locks); } } catch (Throwable t) { ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, tableName), t); @@ -2971,6 +3110,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso PTableType type = table.getType(); byte[] tableHeaderRowKey = SchemaUtil.getTableKey(tenantId, schemaName, tableName); + byte[] cPhysicalTableName=table.getPhysicalName().getBytes(); + getCoprocessorHost().preAlterTable(Bytes.toString(tenantId), + SchemaUtil.getTableName(schemaName, tableName), TableName.valueOf(cPhysicalTableName), + getParentPhysicalTableName(table),type); + // Size for worst case - all new columns are PK column List<Mutation> mutationsForAddingColumnsToViews = Lists.newArrayListWithExpectedSize(tableMetaData.size() * ( 1 + table.getIndexes().size())); if (type == PTableType.TABLE || type == PTableType.SYSTEM) { @@ -3124,10 +3268,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso boolean blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE); if (!wasLocked) { - rowLock = region.getRowLock(key, false); - if (rowLock == null) { - throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key)); - } + rowLock = acquireLock(region, key, null); } try { PTable table = (PTable)metaDataCache.getIfPresent(cacheKey); @@ -3184,16 +3325,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso * Lock directly on key, though it may be an index table. This will just prevent a table * from getting rebuilt too often. */ - List<RowLock> rowLocks = new ArrayList<Region.RowLock>(keys.size());; + List<RowLock> rowLocks = new ArrayList<RowLock>(keys.size());; try { - rowLocks = new ArrayList<Region.RowLock>(keys.size()); for (int i = 0; i < keys.size(); i++) { - Region.RowLock rowLock = region.getRowLock(keys.get(i), false); - if (rowLock == null) { - throw new IOException("Failed to acquire lock on " - + Bytes.toStringBinary(keys.get(i))); - } - rowLocks.add(rowLock); + acquireLock(region, keys.get(i), rowLocks); } List<PFunction> functionsAvailable = new ArrayList<PFunction>(keys.size()); @@ -3223,10 +3358,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso if(functionsAvailable.size() == numFunctions) return functionsAvailable; return null; } finally { - for (Region.RowLock lock : rowLocks) { - lock.release(); - } - rowLocks.clear(); + releaseRowLocks(region,rowLocks); } } @@ -3248,6 +3380,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX]; byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX]; boolean deletePKColumn = false; + getCoprocessorHost().preAlterTable(Bytes.toString(tenantId), + SchemaUtil.getTableName(schemaName, tableName), + TableName.valueOf(table.getPhysicalName().getBytes()), + getParentPhysicalTableName(table),table.getType()); + List<Mutation> additionalTableMetaData = Lists.newArrayList(); PTableType type = table.getType(); @@ -3480,7 +3617,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } PIndexState newState = PIndexState.fromSerializedValue(newKV.getValueArray()[newKV.getValueOffset()]); - RowLock rowLock = region.getRowLock(key, false); + RowLock rowLock = acquireLock(region, key, null); if (rowLock == null) { throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key)); } @@ -3502,6 +3639,22 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso Cell currentDisableTimeStamp = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES); boolean rowKeyOrderOptimizable = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, ROW_KEY_ORDER_OPTIMIZABLE_BYTES) != null; + //check permission on data table + long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata); + PTable loadedTable = loadTable(env, key, new ImmutableBytesPtr(key), clientTimeStamp, clientTimeStamp, + request.getClientVersion()); + if (loadedTable == null) { + builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND); + builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis()); + done.run(builder.build()); + return; + } + getCoprocessorHost().preIndexUpdate(Bytes.toString(tenantId), + SchemaUtil.getTableName(schemaName, tableName), + TableName.valueOf(loadedTable.getPhysicalName().getBytes()), + getParentPhysicalTableName(loadedTable), + newState); + PIndexState currentState = PIndexState.fromSerializedValue(currentStateKV.getValueArray()[currentStateKV .getValueOffset()]); @@ -3611,7 +3764,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso if (setRowKeyOrderOptimizableCell) { UpgradeUtil.addRowKeyOrderOptimizableCell(tableMetadata, key, timeStamp); } - region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, + mutateRowsWithLocks(region, tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE); // Invalidate from cache Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); @@ -3772,6 +3925,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso long clientTimeStamp = request.getClientTimestamp(); List<RowLock> locks = Lists.newArrayList(); try { + getCoprocessorHost().preGetSchema(schemaName); acquireLock(region, lockKey, locks); // Get as of latest timestamp so we can detect if we have a // newer schema that already @@ -3802,7 +3956,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso done.run(builder.build()); return; } finally { - region.releaseRowLocks(locks); + releaseRowLocks(region,locks); } } @@ -3905,7 +4059,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } // Don't store function info for temporary functions. if(!temporaryFunction) { - region.mutateRowsWithLocks(functionMetaData, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE); + mutateRowsWithLocks(region, functionMetaData, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE); } // Invalidate the cache - the next getFunction call will add it @@ -3919,7 +4073,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso done.run(builder.build()); return; } finally { - region.releaseRowLocks(locks); + releaseRowLocks(region,locks); } } catch (Throwable t) { logger.error("createFunction failed", t); @@ -3958,7 +4112,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso done.run(MetaDataMutationResult.toProto(result)); return; } - region.mutateRowsWithLocks(functionMetaData, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE); + mutateRowsWithLocks(region, functionMetaData, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE); Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); long currentTime = MetaDataUtil.getClientTimeStamp(functionMetaData); @@ -3971,7 +4125,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso done.run(MetaDataMutationResult.toProto(result)); return; } finally { - region.releaseRowLocks(locks); + releaseRowLocks(region,locks); } } catch (Throwable t) { logger.error("dropFunction failed", t); @@ -4068,7 +4222,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso return; } } - region.mutateRowsWithLocks(schemaMutations, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, + mutateRowsWithLocks(region, schemaMutations, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE); // Invalidate the cache - the next getSchema call will add it @@ -4086,7 +4240,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso done.run(builder.build()); return; } finally { - region.releaseRowLocks(locks); + releaseRowLocks(region,locks); } } catch (Throwable t) { logger.error("Creating the schema" + schemaName + "failed", t); @@ -4100,6 +4254,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso try { List<Mutation> schemaMetaData = ProtobufUtil.getMutations(request); schemaName = request.getSchemaName(); + getCoprocessorHost().preDropSchema(schemaName); byte[] lockKey = SchemaUtil.getSchemaKey(schemaName); Region region = env.getRegion(); MetaDataMutationResult result = checkSchemaKeyInRegion(lockKey, region); @@ -4117,7 +4272,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso done.run(MetaDataMutationResult.toProto(result)); return; } - region.mutateRowsWithLocks(schemaMetaData, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, + mutateRowsWithLocks(region, schemaMetaData, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE); Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env) .getMetaDataCache(); @@ -4129,7 +4284,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso done.run(MetaDataMutationResult.toProto(result)); return; } finally { - region.releaseRowLocks(locks); + releaseRowLocks(region,locks); } } catch (Throwable t) { logger.error("drop schema failed:", t); @@ -4175,4 +4330,48 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso null); } + + private void mutateRowsWithLocks(final Region region, final List<Mutation> mutations, final Set<byte[]> rowsToLock, + final long nonceGroup, final long nonce) throws IOException { + // we need to mutate SYSTEM.CATALOG with HBase/login user if access is enabled. + if (this.accessCheckEnabled) { + User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + final Call rpcContext = RpcUtil.getRpcContext(); + // Setting RPC context as null so that user can be resetted + try { + RpcUtil.setRpcContext(null); + region.mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce); + } catch (Throwable e) { + throw new IOException(e); + } finally { + // Setting RPC context back to original context of the RPC + RpcUtil.setRpcContext(rpcContext); + } + return null; + } + }); + } else { + region.mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce); + } + } + + private TableName getParentPhysicalTableName(PTable table) { + return table + .getType() == PTableType.VIEW + ? TableName.valueOf(table.getPhysicalName().getBytes()) + : table.getType() == PTableType.INDEX + ? TableName + .valueOf(SchemaUtil + .getPhysicalHBaseTableName(table.getParentSchemaName(), + table.getParentTableName(), table.isNamespaceMapped()) + .getBytes()) + : TableName + .valueOf( + SchemaUtil + .getPhysicalHBaseTableName(table.getSchemaName(), + table.getTableName(), table.isNamespaceMapped()) + .getBytes()); + } }