http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java index 4d3c0cf..32e9f68 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java @@ -16,6 +16,7 @@ * limitations under the License. */ package org.apache.phoenix.compile; +import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY; import java.sql.SQLException; import java.util.ArrayList; @@ -24,7 +25,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.parse.AliasedNode; import org.apache.phoenix.parse.ColumnParseNode; import org.apache.phoenix.parse.FamilyWildcardParseNode; @@ -43,11 +43,13 @@ import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.EncodedCQCounter; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.ProjectedColumn; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.SchemaUtil; @@ -120,7 +122,7 @@ public class TupleProjectionCompiler { PColumn sourceColumn = table.getPKColumns().get(i); ColumnRef sourceColumnRef = new ColumnRef(tableRef, sourceColumn.getPosition()); PColumn column = new ProjectedColumn(sourceColumn.getName(), sourceColumn.getFamilyName(), - position++, sourceColumn.isNullable(), sourceColumnRef); + position++, sourceColumn.isNullable(), sourceColumnRef, null); projectedColumns.add(column); } for (PColumn sourceColumn : table.getColumns()) { @@ -132,18 +134,18 @@ public class TupleProjectionCompiler { && !families.contains(sourceColumn.getFamilyName().getString())) continue; PColumn column = new ProjectedColumn(sourceColumn.getName(), sourceColumn.getFamilyName(), - position++, sourceColumn.isNullable(), sourceColumnRef); + position++, sourceColumn.isNullable(), sourceColumnRef, sourceColumn.getColumnQualifierBytes()); projectedColumns.add(column); // Wildcard or FamilyWildcard will be handled by ProjectionCompiler. if (!isWildcard && !families.contains(sourceColumn.getFamilyName())) { - context.getScan().addColumn(sourceColumn.getFamilyName().getBytes(), sourceColumn.getName().getBytes()); + EncodedColumnsUtil.setColumns(column, table, context.getScan()); } } // add LocalIndexDataColumnRef for (LocalIndexDataColumnRef sourceColumnRef : visitor.localIndexColumnRefSet) { PColumn column = new ProjectedColumn(sourceColumnRef.getColumn().getName(), sourceColumnRef.getColumn().getFamilyName(), position++, - sourceColumnRef.getColumn().isNullable(), sourceColumnRef); + sourceColumnRef.getColumn().isNullable(), sourceColumnRef, sourceColumnRef.getColumn().getColumnQualifierBytes()); projectedColumns.add(column); } @@ -154,9 +156,9 @@ public class TupleProjectionCompiler { null, null, table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), - table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema()); + table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter()); } - + public static PTable createProjectedTable(TableRef tableRef, List<ColumnRef> sourceColumnRefs, boolean retainPKColumns) throws SQLException { PTable table = tableRef.getTable(); boolean hasSaltingColumn = retainPKColumns && table.getBucketNum() != null; @@ -169,20 +171,23 @@ public class TupleProjectionCompiler { String aliasedName = tableRef.getTableAlias() == null ? SchemaUtil.getColumnName(table.getName().getString(), colName) : SchemaUtil.getColumnName(tableRef.getTableAlias(), colName); - - PColumn column = new ProjectedColumn(PNameFactory.newName(aliasedName), - retainPKColumns && SchemaUtil.isPKColumn(sourceColumn) ? - null : PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), - position++, sourceColumn.isNullable(), sourceColumnRef); + PName familyName = SchemaUtil.isPKColumn(sourceColumn) ? (retainPKColumns ? null : PNameFactory.newName(VALUE_COLUMN_FAMILY)) : sourceColumn.getFamilyName(); + PColumn column = new ProjectedColumn(PNameFactory.newName(aliasedName), familyName, + position++, sourceColumn.isNullable(), sourceColumnRef, sourceColumn.getColumnQualifierBytes()); projectedColumns.add(column); } + EncodedCQCounter cqCounter = EncodedCQCounter.NULL_COUNTER; + if (EncodedColumnsUtil.usesEncodedColumnNames(table)) { + cqCounter = EncodedCQCounter.copy(table.getEncodedCQCounter()); + } + return PTableImpl.makePTable(table.getTenantId(), PROJECTED_TABLE_SCHEMA, table.getName(), PTableType.PROJECTED, null, table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(), retainPKColumns ? table.getBucketNum() : null, projectedColumns, null, null, Collections.<PTable> emptyList(), table.isImmutableRows(), Collections.<PName> emptyList(), null, null, table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), null, table.rowKeyOrderOptimizable(), table.isTransactional(), - table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema()); + table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), cqCounter); } // For extracting column references from single select statement
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java index bc3466c..e5e18e3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java @@ -39,6 +39,8 @@ import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.util.SchemaUtil; @@ -84,11 +86,12 @@ public class UnionCompiler { for (int i = 0; i < plan.getProjector().getColumnCount(); i++) { ColumnProjector colProj = plan.getProjector().getColumnProjector(i); String name = selectNodes == null ? colProj.getName() : selectNodes.get(i).getAlias(); + PName colName = PNameFactory.newName(name); PColumnImpl projectedColumn = new PColumnImpl(PNameFactory.newName(name), UNION_FAMILY_NAME, targetTypes.get(i).getType(), targetTypes.get(i).getMaxLength(), targetTypes.get(i).getScale(), colProj.getExpression().isNullable(), i, targetTypes.get(i).getSortOrder(), 500, null, false, - colProj.getExpression().toString(), false, false); + colProj.getExpression().toString(), false, false, colName.getBytes()); projectedColumns.add(projectedColumn); } Long scn = statement.getConnection().getSCN(); @@ -98,7 +101,7 @@ public class UnionCompiler { null, null, projectedColumns, null, null, null, true, null, null, null, true, true, true, null, null, null, false, false, 0, 0L, SchemaUtil.isNamespaceMappingEnabled(PTableType.SUBQUERY, - statement.getConnection().getQueryServices().getProps()), null, false); + statement.getConnection().getQueryServices().getProps()), null, false, ImmutableStorageScheme.ONE_CELL_PER_COLUMN, QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, PTable.EncodedCQCounter.NULL_COUNTER); TableRef tableRef = new TableRef(null, tempTable, 0, false); return tableRef; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 18070d4..7a285a9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -748,7 +748,7 @@ public class UpsertCompiler { if (ptr.getLength() > 0) { byte[] uuidValue = ServerCacheClient.generateId(); scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - scan.setAttribute(PhoenixIndexCodec.INDEX_MD, ptr.get()); + scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get()); scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); } ResultIterator iterator = aggPlan.iterator(); @@ -917,10 +917,10 @@ public class UpsertCompiler { UpdateColumnCompiler compiler = new UpdateColumnCompiler(context); int nColumns = onDupKeyPairs.size(); List<Expression> updateExpressions = Lists.newArrayListWithExpectedSize(nColumns); - LinkedHashSet<PColumn>updateColumns = Sets.newLinkedHashSetWithExpectedSize(nColumns + 1); + LinkedHashSet<PColumn> updateColumns = Sets.newLinkedHashSetWithExpectedSize(nColumns + 1); updateColumns.add(new PColumnImpl( table.getPKColumns().get(0).getName(), // Use first PK column name as we know it won't conflict with others - null, PVarbinary.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null, false, null, false, false)); + null, PVarbinary.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null, false, null, false, false, null)); for (Pair<ColumnName,ParseNode> columnPair : onDupKeyPairs) { ColumnName colName = columnPair.getFirst(); PColumn updateColumn = resolver.resolveColumn(null, colName.getFamilyName(), colName.getColumnName()).getColumn(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java index 39451b8..3026514 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java @@ -46,17 +46,20 @@ import org.apache.phoenix.parse.ParseNodeFactory; import org.apache.phoenix.parse.SelectStatement; import org.apache.phoenix.parse.StatelessTraverseAllParseNodeVisitor; import org.apache.phoenix.parse.SubqueryParseNode; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.AmbiguousColumnException; import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.TypeMismatchException; import org.apache.phoenix.schema.types.PBoolean; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.ExpressionUtil; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; @@ -169,12 +172,14 @@ public class WhereCompiler { public Expression visit(ColumnParseNode node) throws SQLException { ColumnRef ref = resolveColumn(node); TableRef tableRef = ref.getTableRef(); + Expression newColumnExpression = ref.newColumnExpression(node.isTableNameCaseSensitive(), node.isCaseSensitive()); if (tableRef.equals(context.getCurrentTable()) && !SchemaUtil.isPKColumn(ref.getColumn())) { + byte[] cq = tableRef.getTable().getImmutableStorageScheme() == ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS + ? QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES : ref.getColumn().getColumnQualifierBytes(); // track the where condition columns. Later we need to ensure the Scan in HRS scans these column CFs - context.addWhereCoditionColumn(ref.getColumn().getFamilyName().getBytes(), ref.getColumn().getName() - .getBytes()); + context.addWhereCoditionColumn(ref.getColumn().getFamilyName().getBytes(), cq); } - return ref.newColumnExpression(node.isTableNameCaseSensitive(), node.isCaseSensitive()); + return newColumnExpression; } @Override @@ -195,7 +200,7 @@ public class WhereCompiler { // just use that. try { if (!SchemaUtil.isPKColumn(ref.getColumn())) { - table.getColumn(ref.getColumn().getName().getString()); + table.getPColumnForColumnName(ref.getColumn().getName().getString()); } } catch (AmbiguousColumnException e) { disambiguateWithFamily = true; @@ -223,6 +228,7 @@ public class WhereCompiler { } } + public Count getCount() { return count; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index ba9f7c8..c340216 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -47,11 +47,15 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.KeyValueSchema; +import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.ValueBitSet; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.PositionBasedResultTuple; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; @@ -77,12 +81,19 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public static final String DELETE_CQ = "_DeleteCQ"; public static final String DELETE_CF = "_DeleteCF"; public static final String EMPTY_CF = "_EmptyCF"; + public static final String EMPTY_COLUMN_QUALIFIER = "_EmptyColumnQualifier"; public static final String SPECIFIC_ARRAY_INDEX = "_SpecificArrayIndex"; public static final String GROUP_BY_LIMIT = "_GroupByLimit"; public static final String LOCAL_INDEX = "_LocalIndex"; public static final String LOCAL_INDEX_BUILD = "_LocalIndexBuild"; + /* + * Attribute to denote that the index maintainer has been serialized using its proto-buf presentation. + * Needed for backward compatibility purposes. TODO: get rid of this in next major release. + */ + public static final String LOCAL_INDEX_BUILD_PROTO = "_LocalIndexBuild"; public static final String LOCAL_INDEX_JOIN_SCHEMA = "_LocalIndexJoinSchema"; public static final String DATA_TABLE_COLUMNS_TO_JOIN = "_DataTableColumnsToJoin"; + public static final String COLUMNS_STORED_IN_SINGLE_CELL = "_ColumnsStoredInSingleCell"; public static final String VIEW_CONSTANTS = "_ViewConstants"; public static final String EXPECTED_UPPER_REGION_KEY = "_ExpectedUpperRegionKey"; public static final String REVERSE_SCAN = "_ReverseScan"; @@ -101,6 +112,11 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public final static String SCAN_OFFSET = "_RowOffset"; public static final String SCAN_START_ROW_SUFFIX = "_ScanStartRowSuffix"; public static final String SCAN_STOP_ROW_SUFFIX = "_ScanStopRowSuffix"; + public final static String MIN_QUALIFIER = "_MinQualifier"; + public final static String MAX_QUALIFIER = "_MaxQualifier"; + public final static String QUALIFIER_ENCODING_SCHEME = "_QualifierEncodingScheme"; + public final static String IMMUTABLE_STORAGE_ENCODING_SCHEME = "_ImmutableStorageEncodingScheme"; + public final static String USE_ENCODED_COLUMN_QUALIFIER_LIST = "_UseEncodedColumnQualifierList"; /** * Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations @@ -111,6 +127,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { /** Exposed for testing */ public static final String SCANNER_OPENED_TRACE_INFO = "Scanner opened on server"; protected Configuration rawConf; + protected QualifierEncodingScheme encodingScheme; @Override public void start(CoprocessorEnvironment e) throws IOException { @@ -183,6 +200,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { // start exclusive and the stop inclusive. ScanUtil.setupReverseScan(scan); } + this.encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); return s; } @@ -307,14 +325,14 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { * @param indexMaintainer * @param viewConstants */ - protected RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, + RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, final RegionScanner s, final int offset, final Scan scan, final ColumnReference[] dataColumns, final TupleProjector tupleProjector, final HRegion dataRegion, final IndexMaintainer indexMaintainer, final byte[][] viewConstants, final TupleProjector projector, - final ImmutableBytesWritable ptr) { + final ImmutableBytesWritable ptr, final boolean useQualiferAsListIndex) { return getWrappedScanner(c, s, null, null, offset, scan, dataColumns, tupleProjector, - dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr); + dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr, useQualiferAsListIndex); } /** @@ -332,7 +350,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { * @param tx current transaction * @param viewConstants */ - protected RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, + RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, final RegionScanner s, final Set<KeyValueColumnExpression> arrayKVRefs, final Expression[] arrayFuncRefs, final int offset, final Scan scan, final ColumnReference[] dataColumns, final TupleProjector tupleProjector, @@ -340,7 +358,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { Transaction tx, final byte[][] viewConstants, final KeyValueSchema kvSchema, final ValueBitSet kvSchemaBitSet, final TupleProjector projector, - final ImmutableBytesWritable ptr) { + final ImmutableBytesWritable ptr, final boolean useQualifierAsListIndex) { return new RegionScanner() { private boolean hasReferences = checkForReferenceFiles(); @@ -437,11 +455,13 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr); } if (projector != null) { - Tuple tuple = projector.projectResults(new ResultTuple(Result.create(result))); + Tuple toProject = useQualifierAsListIndex ? new PositionBasedResultTuple(result) : new ResultTuple(Result.create(result)); + Tuple tuple = projector.projectResults(toProject); result.clear(); result.add(tuple.getValue(0)); - if(arrayElementCell != null) + if (arrayElementCell != null) { result.add(arrayElementCell); + } } // There is a scanattribute set to retrieve the specific array element return next; @@ -475,7 +495,8 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr); } if (projector != null) { - Tuple tuple = projector.projectResults(new ResultTuple(Result.create(result))); + Tuple toProject = useQualifierAsListIndex ? new PositionBasedMultiKeyValueTuple(result) : new ResultTuple(Result.create(result)); + Tuple tuple = projector.projectResults(toProject); result.clear(); result.add(tuple.getValue(0)); if(arrayElementCell != null) @@ -528,21 +549,6 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { // Using KeyValueSchema to set and retrieve the value // collect the first kv to get the row Cell rowKv = result.get(0); - for (KeyValueColumnExpression kvExp : arrayKVRefs) { - if (kvExp.evaluate(tuple, ptr)) { - for (int idx = tuple.size() - 1; idx >= 0; idx--) { - Cell kv = tuple.getValue(idx); - if (Bytes.equals(kvExp.getColumnFamily(), 0, kvExp.getColumnFamily().length, - kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength()) - && Bytes.equals(kvExp.getColumnName(), 0, kvExp.getColumnName().length, - kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength())) { - // remove the kv that has the full array values. - result.remove(idx); - break; - } - } - } - } byte[] value = kvSchema.toBytes(tuple, arrayFuncRefs, kvSchemaBitSet, ptr); // Add a dummy kv with the exact value of the array index http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java index 8cb6dac..0843ba2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java @@ -56,22 +56,27 @@ public class DelegateRegionScanner implements RegionScanner { delegate.close(); } + @Override public long getMaxResultSize() { return delegate.getMaxResultSize(); } + @Override public boolean next(List<Cell> arg0, int arg1) throws IOException { return delegate.next(arg0, arg1); } + @Override public boolean next(List<Cell> arg0) throws IOException { return delegate.next(arg0); } + @Override public boolean nextRaw(List<Cell> arg0, int arg1) throws IOException { return delegate.nextRaw(arg0, arg1); } + @Override public boolean nextRaw(List<Cell> arg0) throws IOException { return delegate.nextRaw(arg0); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java index 49e3d71..da312ae 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.cache.TenantCache; @@ -62,9 +63,13 @@ import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.util.Closeables; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.KeyValueUtil; import org.apache.phoenix.util.LogUtil; @@ -123,14 +128,20 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { RegionScanner innerScanner = s; - byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD); - List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes); + boolean useProto = false; + byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD_PROTO); + useProto = localIndexBytes != null; + if (localIndexBytes == null) { + localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD); + } + List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto); TupleProjector tupleProjector = null; byte[][] viewConstants = null; ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan); final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan); final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan); + boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan)); if (ScanUtil.isLocalIndex(scan) || (j == null && p != null)) { if (dataColumns != null) { tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns); @@ -139,13 +150,13 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { ImmutableBytesPtr tempPtr = new ImmutableBytesPtr(); innerScanner = getWrappedScanner(c, innerScanner, offset, scan, dataColumns, tupleProjector, - c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr); + c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex); } if (j != null) { innerScanner = new HashJoinRegionScanner(innerScanner, p, j, ScanUtil.getTenantId(scan), - c.getEnvironment()); + c.getEnvironment(), useQualifierAsIndex); } long limit = Long.MAX_VALUE; @@ -377,7 +388,9 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { estDistVals = Math.max(MIN_DISTINCT_VALUES, (int) (Bytes.toInt(estDistValsBytes) * 1.5f)); } - + + Pair<Integer, Integer> minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan); + boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan)); final boolean spillableEnabled = conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE); @@ -388,12 +401,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { boolean success = false; try { boolean hasMore; - - MultiKeyValueTuple result = new MultiKeyValueTuple(); + Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); if (logger.isDebugEnabled()) { logger.debug(LogUtil.addCustomAnnotations("Spillable groupby enabled: " + spillableEnabled, ScanUtil.getCustomAnnotations(scan))); } - HRegion region = c.getEnvironment().getRegion(); boolean acquiredLock = false; try { @@ -401,7 +412,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { acquiredLock = true; synchronized (scanner) { do { - List<Cell> results = new ArrayList<Cell>(); + List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>(); // Results are potentially returned even when the return // value of s.next is false // since this is an indication of whether or not there are @@ -436,7 +447,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { } } } - + /** * Used for an aggregate query in which the key order match the group by key order. In this * case, we can do the aggregation as we scan, by detecting when the group by key changes. @@ -451,6 +462,8 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over ordered rows with scan " + scan + ", group by " + expressions + ", aggregators " + aggregators, ScanUtil.getCustomAnnotations(scan))); } + final Pair<Integer, Integer> minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan); + final boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers); return new BaseRegionScanner(scanner) { private long rowCount = 0; private ImmutableBytesPtr currentKey = null; @@ -460,7 +473,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { boolean hasMore; boolean atLimit; boolean aggBoundary = false; - MultiKeyValueTuple result = new MultiKeyValueTuple(); + Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); ImmutableBytesPtr key = null; Aggregator[] rowAggregators = aggregators.getAggregators(); // If we're calculating no aggregate functions, we can exit at the @@ -473,7 +486,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { acquiredLock = true; synchronized (scanner) { do { - List<Cell> kvs = new ArrayList<Cell>(); + List<Cell> kvs = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>(); // Results are potentially returned even when the return // value of s.next is false // since this is an indication of whether or not there http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java index 480ee6d..1ea6b12 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java @@ -42,6 +42,7 @@ import org.apache.phoenix.parse.JoinTableNode.JoinType; import org.apache.phoenix.schema.IllegalDataException; import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.ValueBitSet; +import org.apache.phoenix.schema.tuple.PositionBasedResultTuple; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.ServerUtil; @@ -61,9 +62,10 @@ public class HashJoinRegionScanner implements RegionScanner { private List<Tuple>[] tempTuples; private ValueBitSet tempDestBitSet; private ValueBitSet[] tempSrcBitSet; + private final boolean useQualifierAsListIndex; @SuppressWarnings("unchecked") - public HashJoinRegionScanner(RegionScanner scanner, TupleProjector projector, HashJoinInfo joinInfo, ImmutableBytesPtr tenantId, RegionCoprocessorEnvironment env) throws IOException { + public HashJoinRegionScanner(RegionScanner scanner, TupleProjector projector, HashJoinInfo joinInfo, ImmutableBytesPtr tenantId, RegionCoprocessorEnvironment env, boolean useQualifierAsIndex) throws IOException { this.env = env; this.scanner = scanner; this.projector = projector; @@ -104,13 +106,13 @@ public class HashJoinRegionScanner implements RegionScanner { this.tempDestBitSet = ValueBitSet.newInstance(joinInfo.getJoinedSchema()); this.projector.setValueBitSet(tempDestBitSet); } + this.useQualifierAsListIndex = useQualifierAsIndex; } private void processResults(List<Cell> result, boolean hasBatchLimit) throws IOException { if (result.isEmpty()) return; - - Tuple tuple = new ResultTuple(Result.create(result)); + Tuple tuple = useQualifierAsListIndex ? new PositionBasedResultTuple(result) : new ResultTuple(Result.create(result)); // For backward compatibility. In new versions, HashJoinInfo.forceProjection() // always returns true. if (joinInfo.forceProjection()) { @@ -314,7 +316,6 @@ public class HashJoinRegionScanner implements RegionScanner { processResults(result, limit >= 0); result.clear(); } - return nextInQueue(result); } catch (Throwable t) { ServerUtil.throwIOException(env.getRegion().getRegionNameAsString(), t); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index aec4482..a552c74 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -27,6 +27,8 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME_INDEX; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE_BYTES; @@ -34,6 +36,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS_BYT import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FAMILY_NAME_INDEX; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES; @@ -57,6 +60,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORAGE_SCHEME_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME_INDEX; @@ -189,8 +193,11 @@ import org.apache.phoenix.schema.PMetaDataEntity; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.EncodedCQCounter; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTable.LinkType; +import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.PTable.ViewType; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; @@ -209,10 +216,12 @@ import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.schema.types.PSmallint; +import org.apache.phoenix.schema.types.PTinyint; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.KeyValueUtil; @@ -283,6 +292,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso TABLE_FAMILY_BYTES, IS_NAMESPACE_MAPPED_BYTES); private static final KeyValue AUTO_PARTITION_SEQ_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, AUTO_PARTITION_SEQ_BYTES); private static final KeyValue APPEND_ONLY_SCHEMA_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, APPEND_ONLY_SCHEMA_BYTES); + private static final KeyValue STORAGE_SCHEME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, STORAGE_SCHEME_BYTES); + private static final KeyValue ENCODING_SCHEME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ENCODING_SCHEME_BYTES); private static final List<KeyValue> TABLE_KV_COLUMNS = Arrays.<KeyValue>asList( EMPTY_KEYVALUE_KV, @@ -309,7 +320,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso UPDATE_CACHE_FREQUENCY_KV, IS_NAMESPACE_MAPPED_KV, AUTO_PARTITION_SEQ_KV, - APPEND_ONLY_SCHEMA_KV + APPEND_ONLY_SCHEMA_KV, + STORAGE_SCHEME_KV, + ENCODING_SCHEME_KV ); static { Collections.sort(TABLE_KV_COLUMNS, KeyValue.COMPARATOR); @@ -339,6 +352,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private static final int IS_NAMESPACE_MAPPED_INDEX = TABLE_KV_COLUMNS.indexOf(IS_NAMESPACE_MAPPED_KV); private static final int AUTO_PARTITION_SEQ_INDEX = TABLE_KV_COLUMNS.indexOf(AUTO_PARTITION_SEQ_KV); private static final int APPEND_ONLY_SCHEMA_INDEX = TABLE_KV_COLUMNS.indexOf(APPEND_ONLY_SCHEMA_KV); + private static final int STORAGE_SCHEME_INDEX = TABLE_KV_COLUMNS.indexOf(STORAGE_SCHEME_KV); + private static final int QUALIFIER_ENCODING_SCHEME_INDEX = TABLE_KV_COLUMNS.indexOf(ENCODING_SCHEME_KV); // KeyValues for Column private static final KeyValue DECIMAL_DIGITS_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES); @@ -352,6 +367,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private static final KeyValue IS_VIEW_REFERENCED_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_VIEW_REFERENCED_BYTES); private static final KeyValue COLUMN_DEF_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_DEF_BYTES); private static final KeyValue IS_ROW_TIMESTAMP_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_ROW_TIMESTAMP_BYTES); + private static final KeyValue COLUMN_QUALIFIER_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_QUALIFIER_BYTES); private static final List<KeyValue> COLUMN_KV_COLUMNS = Arrays.<KeyValue>asList( DECIMAL_DIGITS_KV, COLUMN_SIZE_KV, @@ -364,11 +380,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso VIEW_CONSTANT_KV, IS_VIEW_REFERENCED_KV, COLUMN_DEF_KV, - IS_ROW_TIMESTAMP_KV + IS_ROW_TIMESTAMP_KV, + COLUMN_QUALIFIER_KV ); static { Collections.sort(COLUMN_KV_COLUMNS, KeyValue.COMPARATOR); } + private static final KeyValue QUALIFIER_COUNTER_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_QUALIFIER_COUNTER_BYTES); private static final int DECIMAL_DIGITS_INDEX = COLUMN_KV_COLUMNS.indexOf(DECIMAL_DIGITS_KV); private static final int COLUMN_SIZE_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_SIZE_KV); private static final int NULLABLE_INDEX = COLUMN_KV_COLUMNS.indexOf(NULLABLE_KV); @@ -380,9 +398,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private static final int IS_VIEW_REFERENCED_INDEX = COLUMN_KV_COLUMNS.indexOf(IS_VIEW_REFERENCED_KV); private static final int COLUMN_DEF_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_DEF_KV); private static final int IS_ROW_TIMESTAMP_INDEX = COLUMN_KV_COLUMNS.indexOf(IS_ROW_TIMESTAMP_KV); + private static final int COLUMN_QUALIFIER_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_QUALIFIER_KV); private static final int LINK_TYPE_INDEX = 0; - + private static final KeyValue CLASS_NAME_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, CLASS_NAME_BYTES); private static final KeyValue JAR_PATH_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, JAR_PATH_BYTES); private static final KeyValue RETURN_TYPE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, RETURN_TYPE_BYTES); @@ -718,8 +737,16 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso isRowTimestampKV == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject( isRowTimestampKV.getValueArray(), isRowTimestampKV.getValueOffset(), isRowTimestampKV.getValueLength())); - - PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, false); + + boolean isPkColumn = famName == null || famName.getString() == null; + Cell columnQualifierKV = colKeyValues[COLUMN_QUALIFIER_INDEX]; + // Older tables won't have column qualifier metadata present. To make things simpler, just set the + // column qualifier bytes by using the column name. + byte[] columnQualifierBytes = columnQualifierKV != null ? + Arrays.copyOfRange(columnQualifierKV.getValueArray(), + columnQualifierKV.getValueOffset(), columnQualifierKV.getValueOffset() + + columnQualifierKV.getValueLength()) : (isPkColumn ? null : colName.getBytes()); + PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, false, columnQualifierBytes); columns.add(column); } @@ -927,37 +954,55 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso boolean isAppendOnlySchema = isAppendOnlySchemaKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(isAppendOnlySchemaKv.getValueArray(), isAppendOnlySchemaKv.getValueOffset(), isAppendOnlySchemaKv.getValueLength())); - + Cell storageSchemeKv = tableKeyValues[STORAGE_SCHEME_INDEX]; + //TODO: change this once we start having other values for storage schemes + ImmutableStorageScheme storageScheme = storageSchemeKv == null ? ImmutableStorageScheme.ONE_CELL_PER_COLUMN : ImmutableStorageScheme + .fromSerializedValue((byte)PTinyint.INSTANCE.toObject(storageSchemeKv.getValueArray(), + storageSchemeKv.getValueOffset(), storageSchemeKv.getValueLength())); + Cell encodingSchemeKv = tableKeyValues[QUALIFIER_ENCODING_SCHEME_INDEX]; + QualifierEncodingScheme encodingScheme = encodingSchemeKv == null ? QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : QualifierEncodingScheme + .fromSerializedValue((byte)PTinyint.INSTANCE.toObject(encodingSchemeKv.getValueArray(), + encodingSchemeKv.getValueOffset(), encodingSchemeKv.getValueLength())); List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount); List<PTable> indexes = Lists.newArrayList(); List<PName> physicalTables = Lists.newArrayList(); PName parentTableName = tableType == INDEX ? dataTableName : null; PName parentSchemaName = tableType == INDEX ? schemaName : null; + EncodedCQCounter cqCounter = + (!EncodedColumnsUtil.usesEncodedColumnNames(encodingScheme) || tableType == PTableType.VIEW) ? PTable.EncodedCQCounter.NULL_COUNTER + : new EncodedCQCounter(); while (true) { - results.clear(); - scanner.next(results); - if (results.isEmpty()) { - break; - } - Cell colKv = results.get(LINK_TYPE_INDEX); - int colKeyLength = colKv.getRowLength(); - PName colName = newPName(colKv.getRowArray(), colKv.getRowOffset() + offset, colKeyLength-offset); - int colKeyOffset = offset + colName.getBytes().length + 1; - PName famName = newPName(colKv.getRowArray(), colKv.getRowOffset() + colKeyOffset, colKeyLength-colKeyOffset); - if (colName.getString().isEmpty() && famName != null) { - LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]); - if (linkType == LinkType.INDEX_TABLE) { - addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes); - } else if (linkType == LinkType.PHYSICAL_TABLE) { - physicalTables.add(famName); - } else if (linkType == LinkType.PARENT_TABLE) { - parentTableName = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(famName.getBytes())); - parentSchemaName = PNameFactory.newName(SchemaUtil.getSchemaNameFromFullName(famName.getBytes())); - } - } else { - addColumnToTable(results, colName, famName, colKeyValues, columns, saltBucketNum != null); - } + results.clear(); + scanner.next(results); + if (results.isEmpty()) { + break; + } + Cell colKv = results.get(LINK_TYPE_INDEX); + if (colKv != null) { + int colKeyLength = colKv.getRowLength(); + PName colName = newPName(colKv.getRowArray(), colKv.getRowOffset() + offset, colKeyLength-offset); + int colKeyOffset = offset + colName.getBytes().length + 1; + PName famName = newPName(colKv.getRowArray(), colKv.getRowOffset() + colKeyOffset, colKeyLength-colKeyOffset); + if (isQualifierCounterKV(colKv)) { + Integer value = PInteger.INSTANCE.getCodec().decodeInt(colKv.getValueArray(), colKv.getValueOffset(), SortOrder.ASC); + cqCounter.setValue(famName.getString(), value); + } else { + if (colName.getString().isEmpty() && famName != null) { + LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]); + if (linkType == LinkType.INDEX_TABLE) { + addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes); + } else if (linkType == LinkType.PHYSICAL_TABLE) { + physicalTables.add(famName); + } else if (linkType == LinkType.PARENT_TABLE) { + parentTableName = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(famName.getBytes())); + parentSchemaName = PNameFactory.newName(SchemaUtil.getSchemaNameFromFullName(famName.getBytes())); + } + } else { + addColumnToTable(results, colName, famName, colKeyValues, columns, saltBucketNum != null); + } + } + } } // Avoid querying the stats table because we're holding the rowLock here. Issuing an RPC to a remote // server while holding this lock is a bad idea and likely to cause contention. @@ -965,9 +1010,17 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso pkName, saltBucketNum, columns, parentSchemaName, parentTableName, indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency, baseColumnCount, - indexDisableTimestamp, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema); + indexDisableTimestamp, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, storageScheme, encodingScheme, cqCounter); } - + + private boolean isQualifierCounterKV(Cell kv) { + int cmp = + Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), + kv.getQualifierLength(), QUALIFIER_COUNTER_KV.getQualifierArray(), + QUALIFIER_COUNTER_KV.getQualifierOffset(), QUALIFIER_COUNTER_KV.getQualifierLength()); + return cmp == 0; + } + private PSchema getSchema(RegionScanner scanner, long clientTimeStamp) throws IOException, SQLException { List<Cell> results = Lists.newArrayList(); scanner.next(results); @@ -1419,7 +1472,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // tableMetadata and set the view statement and partition column correctly if (parentTable!=null && parentTable.getAutoPartitionSeqName()!=null) { long autoPartitionNum = 1; - final Properties props = new Properties(); try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class); Statement stmt = connection.createStatement()) { String seqName = parentTable.getAutoPartitionSeqName(); @@ -1487,46 +1539,46 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso Short indexId = null; if (request.hasAllocateIndexId() && request.getAllocateIndexId()) { String tenantIdStr = tenantIdBytes.length == 0 ? null : Bytes.toString(tenantIdBytes); - try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class)){ - PName physicalName = parentTable.getPhysicalName(); - int nSequenceSaltBuckets = connection.getQueryServices().getSequenceSaltBuckets(); - SequenceKey key = MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, physicalName, + try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class)) { + PName physicalName = parentTable.getPhysicalName(); + int nSequenceSaltBuckets = connection.getQueryServices().getSequenceSaltBuckets(); + SequenceKey key = MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, physicalName, nSequenceSaltBuckets, parentTable.isNamespaceMapped() ); // TODO Review Earlier sequence was created at (SCN-1/LATEST_TIMESTAMP) and incremented at the client max(SCN,dataTable.getTimestamp), but it seems we should // use always LATEST_TIMESTAMP to avoid seeing wrong sequence values by different connection having SCN // or not. - long sequenceTimestamp = HConstants.LATEST_TIMESTAMP; - try { - connection.getQueryServices().createSequence(key.getTenantId(), key.getSchemaName(), key.getSequenceName(), + long sequenceTimestamp = HConstants.LATEST_TIMESTAMP; + try { + connection.getQueryServices().createSequence(key.getTenantId(), key.getSchemaName(), key.getSequenceName(), Short.MIN_VALUE, 1, 1, Long.MIN_VALUE, Long.MAX_VALUE, false, sequenceTimestamp); - } catch (SequenceAlreadyExistsException e) { - } - long[] seqValues = new long[1]; - SQLException[] sqlExceptions = new SQLException[1]; - connection.getQueryServices().incrementSequences(Collections.singletonList(new SequenceAllocation(key, 1)), + } catch (SequenceAlreadyExistsException e) { + } + long[] seqValues = new long[1]; + SQLException[] sqlExceptions = new SQLException[1]; + connection.getQueryServices().incrementSequences(Collections.singletonList(new SequenceAllocation(key, 1)), HConstants.LATEST_TIMESTAMP, seqValues, sqlExceptions); - if (sqlExceptions[0] != null) { - throw sqlExceptions[0]; - } - long seqValue = seqValues[0]; - if (seqValue > Short.MAX_VALUE) { - builder.setReturnCode(MetaDataProtos.MutationCode.TOO_MANY_INDEXES); - builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis()); - done.run(builder.build()); - return; - } - Put tableHeaderPut = MetaDataUtil.getPutOnlyTableHeaderRow(tableMetadata); - NavigableMap<byte[], List<Cell>> familyCellMap = tableHeaderPut.getFamilyCellMap(); - List<Cell> cells = familyCellMap.get(TABLE_FAMILY_BYTES); - Cell cell = cells.get(0); - PDataType dataType = MetaDataUtil.getViewIndexIdDataType(); - Object val = dataType.toObject(seqValue, PLong.INSTANCE); - byte[] bytes = new byte [dataType.getByteSize() + 1]; - dataType.toBytes(val, bytes, 0); - Cell indexIdCell = new KeyValue(cell.getRow(), cell.getFamily(), VIEW_INDEX_ID_BYTES, + if (sqlExceptions[0] != null) { + throw sqlExceptions[0]; + } + long seqValue = seqValues[0]; + if (seqValue > Short.MAX_VALUE) { + builder.setReturnCode(MetaDataProtos.MutationCode.TOO_MANY_INDEXES); + builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis()); + done.run(builder.build()); + return; + } + Put tableHeaderPut = MetaDataUtil.getPutOnlyTableHeaderRow(tableMetadata); + NavigableMap<byte[], List<Cell>> familyCellMap = tableHeaderPut.getFamilyCellMap(); + List<Cell> cells = familyCellMap.get(TABLE_FAMILY_BYTES); + Cell cell = cells.get(0); + PDataType dataType = MetaDataUtil.getViewIndexIdDataType(); + Object val = dataType.toObject(seqValue, PLong.INSTANCE); + byte[] bytes = new byte [dataType.getByteSize() + 1]; + dataType.toBytes(val, bytes, 0); + Cell indexIdCell = new KeyValue(cell.getRow(), cell.getFamily(), VIEW_INDEX_ID_BYTES, cell.getTimestamp(), Type.codeToType(cell.getTypeByte()), bytes); - cells.add(indexIdCell); - indexId = (short) seqValue; + cells.add(indexIdCell); + indexId = (short) seqValue; } } @@ -1537,7 +1589,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // indexing on the system table. This is an issue because of the way we manage batch mutation // in the Indexer. region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet()); - + // Invalidate the cache - the next getTable call will add it // TODO: consider loading the table that was just created here, patching up the parent table, and updating the cache Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); @@ -1989,7 +2041,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso return result; } region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet()); - // Invalidate from cache + // Invalidate from cache. for (ImmutableBytesPtr invalidateKey : invalidateList) { metaDataCache.invalidate(invalidateKey); } @@ -2169,6 +2221,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso int pkCount = getVarChars(m.getRow(), rkmd); // check if this put is for adding a column if (pkCount > COLUMN_NAME_INDEX + && rkmd[COLUMN_NAME_INDEX] != null && rkmd[COLUMN_NAME_INDEX].length > 0 && Bytes.compareTo(schemaName, rkmd[SCHEMA_NAME_INDEX]) == 0 && Bytes.compareTo(tableName, rkmd[TABLE_NAME_INDEX]) == 0) { columnPutsForBaseTable.add(new PutWithOrdinalPosition((Put)m, getInteger((Put)m, TABLE_FAMILY_BYTES, ORDINAL_POSITION_BYTES))); @@ -2221,8 +2274,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso String columnName = Bytes.toString(rkmd[COLUMN_NAME_INDEX]); String columnFamily = rkmd[FAMILY_NAME_INDEX] == null ? null : Bytes.toString(rkmd[FAMILY_NAME_INDEX]); try { - existingViewColumn = columnFamily == null ? view.getColumn(columnName) : view.getColumnFamily( - columnFamily).getColumn(columnName); + existingViewColumn = columnFamily == null ? view.getPColumnForColumnName(columnName) : view.getColumnFamily( + columnFamily).getPColumnForColumnName(columnName); } catch (ColumnFamilyNotFoundException e) { // ignore since it means that the column family is not present for the column to be added. } catch (ColumnNotFoundException e) { @@ -2551,8 +2604,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso byte[] columnKey = getColumnKey(viewKey, columnName, columnFamily); try { existingViewColumn = - columnFamily == null ? view.getColumn(columnName) : view - .getColumnFamily(columnFamily).getColumn(columnName); + columnFamily == null ? view.getPColumnForColumnName(columnName) : view + .getColumnFamily(columnFamily).getPColumnForColumnName(columnName); } catch (ColumnFamilyNotFoundException e) { // ignore since it means that the column family is not present for the column to // be added. @@ -2618,7 +2671,25 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private MetaDataMutationResult validateColumnForAddToBaseTable(PColumn existingViewColumn, Put columnToBeAdded, PTable basePhysicalTable, boolean isColumnToBeAddPkCol, PTable view) { if (existingViewColumn != null) { - + if (EncodedColumnsUtil.usesEncodedColumnNames(basePhysicalTable) && !SchemaUtil.isPKColumn(existingViewColumn)) { + /* + * If the column already exists in a view, then we cannot add the column to the base + * table. The reason is subtle and is as follows: consider the case where a table + * has two views where both the views have the same key value column KV. Now, we + * dole out encoded column qualifiers for key value columns in views by using the + * counters stored in the base physical table. So the KV column can have different + * column qualifiers for the two views. For example, 11 for VIEW1 and 12 for VIEW2. + * This naturally extends to rows being inserted using the two views having + * different column qualifiers for the column named KV. Now, when an attempt is made + * to add column KV to the base table, we cannot decide which column qualifier + * should that column be assigned. It cannot be a number different than 11 or 12 + * since a query like SELECT KV FROM BASETABLE would return null for KV which is + * incorrect since column KV is present in rows inserted from the two views. We + * cannot use 11 or 12 either because we will then incorrectly return value of KV + * column inserted using only one view. + */ + return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable); + } // Validate data type is same int baseColumnDataType = getInteger(columnToBeAdded, TABLE_FAMILY_BYTES, DATA_TYPE_BYTES); if (baseColumnDataType != existingViewColumn.getDataType().getSqlType()) { @@ -2848,6 +2919,16 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso return mutationResult; } } + } else if (type == PTableType.VIEW + && EncodedColumnsUtil.usesEncodedColumnNames(table)) { + /* + * When adding a column to a view that uses encoded column name scheme, we + * need to modify the CQ counters stored in the view's physical table. So to + * make sure clients get the latest PTable, we need to invalidate the cache + * entry. + */ + invalidateList.add(new ImmutableBytesPtr(MetaDataUtil + .getPhysicalTableRowForView(table))); } for (Mutation m : tableMetaData) { byte[] key = m.getRow(); @@ -2861,7 +2942,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso && rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) { PColumnFamily family = table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]); - family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]); + family.getPColumnForColumnNameBytes(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]); } else if (pkCount > COLUMN_NAME_INDEX && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) { addingPKColumn = true; @@ -3114,7 +3195,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso PColumnFamily family = table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]); columnToDelete = - family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]); + family.getPColumnForColumnNameBytes(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]); } else if (pkCount > COLUMN_NAME_INDEX && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) { deletePKColumn = true; @@ -3203,10 +3284,12 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso byte[] indexKey = SchemaUtil.getTableKey(tenantId, index.getSchemaName().getBytes(), index .getTableName().getBytes()); + Pair<String, String> columnToDeleteInfo = new Pair<>(columnToDelete.getFamilyName().getString(), columnToDelete.getName().getString()); + ColumnReference colDropRef = new ColumnReference(columnToDelete.getFamilyName().getBytes(), columnToDelete.getColumnQualifierBytes()); + boolean isColumnIndexed = indexMaintainer.getIndexedColumnInfo().contains(columnToDeleteInfo); + boolean isCoveredColumn = indexMaintainer.getCoveredColumns().contains(colDropRef); // If index requires this column for its pk, then drop it - if (indexMaintainer.getIndexedColumns().contains( - new ColumnReference(columnToDelete.getFamilyName().getBytes(), columnToDelete - .getName().getBytes()))) { + if (isColumnIndexed) { // Since we're dropping the index, lock it to ensure // that a change in index state doesn't // occur while we're dropping it. @@ -3227,9 +3310,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso invalidateList.add(new ImmutableBytesPtr(indexKey)); } // If the dropped column is a covered index column, invalidate the index - else if (indexMaintainer.getCoveredColumns().contains( - new ColumnReference(columnToDelete.getFamilyName().getBytes(), columnToDelete - .getName().getBytes()))) { + else if (isCoveredColumn){ invalidateList.add(new ImmutableBytesPtr(indexKey)); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java index 83290db..dd445ce 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java @@ -85,8 +85,9 @@ public abstract class MetaDataProtocol extends MetaDataService { public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 = MIN_TABLE_TIMESTAMP + 18; public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_8_1 = MIN_TABLE_TIMESTAMP + 18; public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0 = MIN_TABLE_TIMESTAMP + 20; + public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 = MIN_TABLE_TIMESTAMP + 25; // MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants - public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0; + public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0; // ALWAYS update this map whenever rolling out a new release (major, minor or patch release). // Key is the SYSTEM.CATALOG timestamp for the version and value is the version string. @@ -101,6 +102,7 @@ public abstract class MetaDataProtocol extends MetaDataService { TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, "4.7.x"); TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0, "4.8.x"); TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0, "4.9.x"); + TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0, "4.10.x"); } public static final String CURRENT_CLIENT_VERSION = PHOENIX_MAJOR_VERSION + "." + PHOENIX_MINOR_VERSION + "." + PHOENIX_PATCH_NUMBER; http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index d5e5542..1fb8221 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -386,7 +386,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr, indexesToPartiallyRebuild, conn); byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr); - dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); + dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue); MutationState mutationState = plan.execute(); long rowCount = mutationState.getUpdateCount(); LOG.info(rowCount + " rows of index which are rebuild"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java index 3cfe790..0c063ce 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java @@ -17,6 +17,8 @@ */ package org.apache.phoenix.coprocessor; +import static org.apache.phoenix.util.EncodedColumnsUtil.getMinMaxQualifiersFromScan; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; @@ -44,6 +46,7 @@ import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.OrderByExpression; +import org.apache.phoenix.expression.SingleCellColumnExpression; import org.apache.phoenix.expression.function.ArrayIndexFunction; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; @@ -57,10 +60,13 @@ import org.apache.phoenix.memory.MemoryManager.MemoryChunk; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; +import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.ValueBitSet; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; @@ -107,7 +113,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { } } - public static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s) { + private static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s) { byte[] topN = scan.getAttribute(BaseScannerRegionObserver.TOPN); if (topN == null) { return null; @@ -125,7 +131,8 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { orderByExpression.readFields(input); orderByExpressions.add(orderByExpression); } - ResultIterator inner = new RegionScannerResultIterator(s); + QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); + ResultIterator inner = new RegionScannerResultIterator(s, EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan), encodingScheme); return new OrderedResultIterator(inner, orderByExpressions, thresholdBytes, limit >= 0 ? limit : null, null, estimatedRowSize); } catch (IOException e) { @@ -151,7 +158,9 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { DataInputStream input = new DataInputStream(stream); int arrayKVRefSize = WritableUtils.readVInt(input); for (int i = 0; i < arrayKVRefSize; i++) { - KeyValueColumnExpression kvExp = new KeyValueColumnExpression(); + ImmutableStorageScheme scheme = EncodedColumnsUtil.getImmutableStorageScheme(scan); + KeyValueColumnExpression kvExp = scheme != ImmutableStorageScheme.ONE_CELL_PER_COLUMN ? new SingleCellColumnExpression() + : new KeyValueColumnExpression(); kvExp.readFields(input); arrayKVRefs.add(kvExp); } @@ -208,8 +217,13 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { if (dataColumns != null) { tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns); dataRegion = c.getEnvironment().getRegion(); - byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD); - List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes); + boolean useProto = false; + byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD_PROTO); + useProto = localIndexBytes != null; + if (localIndexBytes == null) { + localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD); + } + List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto); indexMaintainer = indexMaintainers.get(0); viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan); byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE); @@ -218,21 +232,22 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan); final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan); + boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(getMinMaxQualifiersFromScan(scan)) && scan.getAttribute(BaseScannerRegionObserver.TOPN) != null; innerScanner = getWrappedScanner(c, innerScanner, arrayKVRefs, arrayFuncRefs, offset, scan, dataColumns, tupleProjector, dataRegion, indexMaintainer, tx, - viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null, ptr); + viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null, ptr, useQualifierAsIndex); final ImmutableBytesPtr tenantId = ScanUtil.getTenantId(scan); if (j != null) { - innerScanner = new HashJoinRegionScanner(innerScanner, p, j, tenantId, c.getEnvironment()); + innerScanner = new HashJoinRegionScanner(innerScanner, p, j, tenantId, c.getEnvironment(), useQualifierAsIndex); } if (scanOffset != null) { innerScanner = getOffsetScanner(c, innerScanner, - new OffsetResultIterator(new RegionScannerResultIterator(innerScanner), scanOffset), + new OffsetResultIterator(new RegionScannerResultIterator(innerScanner, getMinMaxQualifiersFromScan(scan), encodingScheme), scanOffset), scan.getAttribute(QueryConstants.LAST_SCAN) != null); } - final OrderedResultIterator iterator = deserializeFromScan(scan,innerScanner); + final OrderedResultIterator iterator = deserializeFromScan(scan, innerScanner); if (iterator == null) { return innerScanner; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java index bf889d5..98f57ad 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java @@ -72,7 +72,7 @@ public class ServerCachingEndpointImpl extends ServerCachingService implements C (Class<ServerCacheFactory>) Class.forName(request.getCacheFactory().getClassName()); ServerCacheFactory cacheFactory = serverCacheFactoryClass.newInstance(); tenantCache.addServerCache(new ImmutableBytesPtr(request.getCacheId().toByteArray()), - cachePtr, txState, cacheFactory); + cachePtr, txState, cacheFactory, request.hasHasProtoBufIndexMaintainer() && request.getHasProtoBufIndexMaintainer()); } catch (Throwable e) { ProtobufUtil.setControllerException(controller, new IOException(e)); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java index b201c8e..139a69c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java @@ -36,7 +36,7 @@ import org.apache.phoenix.memory.MemoryManager.MemoryChunk; */ public interface ServerCachingProtocol { public static interface ServerCacheFactory extends Writable { - public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk) throws SQLException; + public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk, boolean useProtoForIndexMaintainer) throws SQLException; } /** * Add the cache to the region server cache.