http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index 237ed75..d3a3ca4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -17,7 +17,10 @@ */ package org.apache.phoenix.index; +import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS; + import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; @@ -29,9 +32,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.hadoop.hbase.Cell; @@ -43,17 +48,24 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.compile.ColumnResolver; import org.apache.phoenix.compile.FromCompiler; import org.apache.phoenix.compile.IndexExpressionCompiler; import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.coprocessor.generated.ServerCachingProtos; +import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ColumnInfo; import org.apache.phoenix.expression.CoerceExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.ExpressionType; import org.apache.phoenix.expression.KeyValueColumnExpression; +import org.apache.phoenix.expression.LiteralExpression; +import org.apache.phoenix.expression.SingleCellColumnExpression; +import org.apache.phoenix.expression.SingleCellConstructorExpression; import org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; @@ -67,14 +79,17 @@ import org.apache.phoenix.parse.SQLParser; import org.apache.phoenix.parse.StatelessTraverseAllParseNodeVisitor; import org.apache.phoenix.parse.UDFParseNode; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.AmbiguousColumnException; +import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PDatum; import org.apache.phoenix.schema.PIndexState; -import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.SaltingUtil; @@ -82,10 +97,12 @@ import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.ValueSchema; import org.apache.phoenix.schema.ValueSchema.Field; +import org.apache.phoenix.schema.tuple.BaseTuple; import org.apache.phoenix.schema.tuple.ValueGetterTuple; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.util.BitSet; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.ExpressionUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; @@ -93,6 +110,7 @@ import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TrustedByteArrayOutputStream; import org.apache.tephra.TxConstants; +import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; @@ -105,10 +123,10 @@ import com.google.common.collect.Sets; * row and caches any covered columns. Client-side serializes into byte array using * @link #serialize(PTable, ImmutableBytesWritable)} * and transmits to server-side through either the - * {@link org.apache.phoenix.index.PhoenixIndexCodec#INDEX_MD} + * {@link org.apache.phoenix.index.PhoenixIndexCodec#INDEX_PROTO_MD} * Mutation attribute or as a separate RPC call using * {@link org.apache.phoenix.cache.ServerCacheClient}) - * + * * * @since 2.1.0 */ @@ -116,8 +134,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { private static final int EXPRESSION_NOT_PRESENT = -1; private static final int ESTIMATED_EXPRESSION_SIZE = 8; - - public static IndexMaintainer create(PTable dataTable, PTable index, PhoenixConnection connection) { + + public static IndexMaintainer create(PTable dataTable, PTable index, PhoenixConnection connection) { if (dataTable.getType() == PTableType.INDEX || index.getType() != PTableType.INDEX || !dataTable.getIndexes().contains(index)) { throw new IllegalArgumentException(); } @@ -179,14 +197,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } } int nIndexes = 0; - int estimatedSize = dataTable.getRowKeySchema().getEstimatedByteSize() + 2; while (indexesItr.hasNext()) { nIndexes++; - PTable index = indexesItr.next(); - estimatedSize += index.getIndexMaintainer(dataTable, connection).getEstimatedByteSize(); + indexesItr.next(); } - TrustedByteArrayOutputStream stream = new TrustedByteArrayOutputStream(estimatedSize + 1); - DataOutput output = new DataOutputStream(stream); + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + DataOutputStream output = new DataOutputStream(stream); try { // Encode data table salting in sign of number of indexes WritableUtils.writeVInt(output, nIndexes * (dataTable.getBucketNum() == null ? 1 : -1)); @@ -196,15 +212,23 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { dataTable.isImmutableRows() ? enabledLocalIndexIterator(indexes.iterator()) : nonDisabledIndexIterator(indexes.iterator()); while (indexesItr.hasNext()) { - indexesItr.next().getIndexMaintainer(dataTable, connection).write(output); + org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = IndexMaintainer.toProto(indexesItr.next().getIndexMaintainer(dataTable, connection)); + byte[] protoBytes = proto.toByteArray(); + WritableUtils.writeVInt(output, protoBytes.length); + output.write(protoBytes); } } catch (IOException e) { throw new RuntimeException(e); // Impossible } - ptr.set(stream.getBuffer(), 0, stream.size()); + ptr.set(stream.toByteArray(), 0, stream.size()); } - + /** + * For client-side to append serialized IndexMaintainers of keyValueIndexes + * @param dataTable data table + * @param indexMetaDataPtr bytes pointer to hold returned serialized value + * @param keyValueIndexes indexes to serialize + */ public static void serializeAdditional(PTable table, ImmutableBytesWritable indexMetaDataPtr, List<PTable> keyValueIndexes, PhoenixConnection connection) { int nMutableIndexes = indexMetaDataPtr.getLength() == 0 ? 0 : ByteUtil.vintFromBytes(indexMetaDataPtr); @@ -230,7 +254,10 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } // Serialize mutable indexes afterwards for (PTable index : keyValueIndexes) { - index.getIndexMaintainer(table, connection).write(output); + IndexMaintainer maintainer = index.getIndexMaintainer(table, connection); + byte[] protoBytes = IndexMaintainer.toProto(maintainer).toByteArray(); + WritableUtils.writeVInt(output, protoBytes.length); + output.write(protoBytes); } } catch (IOException e) { throw new RuntimeException(e); // Impossible @@ -239,15 +266,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } public static List<IndexMaintainer> deserialize(ImmutableBytesWritable metaDataPtr, - KeyValueBuilder builder) { - return deserialize(metaDataPtr.get(), metaDataPtr.getOffset(), metaDataPtr.getLength()); + KeyValueBuilder builder, boolean useProtoForIndexMaintainer) { + return deserialize(metaDataPtr.get(), metaDataPtr.getOffset(), metaDataPtr.getLength(), useProtoForIndexMaintainer); } - public static List<IndexMaintainer> deserialize(byte[] buf) { - return deserialize(buf, 0, buf.length); + public static List<IndexMaintainer> deserialize(byte[] buf, boolean useProtoForIndexMaintainer) { + return deserialize(buf, 0, buf.length, useProtoForIndexMaintainer); } - public static List<IndexMaintainer> deserialize(byte[] buf, int offset, int length) { + private static List<IndexMaintainer> deserialize(byte[] buf, int offset, int length, boolean useProtoForIndexMaintainer) { ByteArrayInputStream stream = new ByteArrayInputStream(buf, offset, length); DataInput input = new DataInputStream(stream); List<IndexMaintainer> maintainers = Collections.emptyList(); @@ -259,25 +286,31 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { rowKeySchema.readFields(input); maintainers = Lists.newArrayListWithExpectedSize(size); for (int i = 0; i < size; i++) { - IndexMaintainer maintainer = new IndexMaintainer(rowKeySchema, isDataTableSalted); - maintainer.readFields(input); - maintainers.add(maintainer); + if (useProtoForIndexMaintainer) { + int protoSize = WritableUtils.readVInt(input); + byte[] b = new byte[protoSize]; + input.readFully(b); + org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = ServerCachingProtos.IndexMaintainer.parseFrom(b); + maintainers.add(IndexMaintainer.fromProto(proto, rowKeySchema, isDataTableSalted)); + } else { + IndexMaintainer maintainer = new IndexMaintainer(rowKeySchema, isDataTableSalted); + maintainer.readFields(input); + maintainers.add(maintainer); + } } } catch (IOException e) { throw new RuntimeException(e); // Impossible } return maintainers; } - + private byte[] viewIndexId; private boolean isMultiTenant; // indexed expressions that are not present in the row key of the data table, the expression can also refer to a regular column private List<Expression> indexedExpressions; // columns required to evaluate all expressions in indexedExpressions (this does not include columns in the data row key) private Set<ColumnReference> indexedColumns; - private Set<ColumnReference> coveredColumns; - // Map used to cache column family of data table and the corresponding column family for the local index - private Map<ImmutableBytesPtr, ImmutableBytesWritable> dataTableLocalIndexFamilyMap; + // columns required to create index row i.e. indexedColumns + coveredColumns (this does not include columns in the data row key) private Set<ColumnReference> allColumns; // TODO remove this in the next major release @@ -291,12 +324,10 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { private boolean indexWALDisabled; private boolean isLocalIndex; private boolean immutableRows; - // Transient state private final boolean isDataTableSalted; private final RowKeySchema dataRowKeySchema; - private List<ImmutableBytesPtr> indexQualifiers; private int estimatedIndexRowKeyBytes; private int estimatedExpressionSize; private int[] dataPkPosition; @@ -304,26 +335,48 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { private ColumnReference dataEmptyKeyValueRef; private boolean rowKeyOrderOptimizable; + /**** START: New member variables added in 4.10 *****/ + private QualifierEncodingScheme encodingScheme; + private ImmutableStorageScheme immutableStorageScheme; + /* + * Information for columns of data tables that are being indexed. The first part of the pair is column family name + * and second part is the column name. The reason we need to track this state is because for certain storage schemes + * like ImmutableStorageScheme#SINGLE_CELL_ARRAY_WITH_OFFSETS, the column for which we need to generate an index + * table put/delete is different from the columns that are indexed in the phoenix schema. This information helps us + * determine whether or not certain operations like DROP COLUMN should impact the index. + */ + private Set<Pair<String, String>> indexedColumnsInfo; + /* + * Map of covered columns where a key is column reference for a column in the data table + * and value is column reference for corresponding column in the index table. + */ + private Map<ColumnReference, ColumnReference> coveredColumnsMap; + /**** END: New member variables added in 4.10 *****/ + private IndexMaintainer(RowKeySchema dataRowKeySchema, boolean isDataTableSalted) { this.dataRowKeySchema = dataRowKeySchema; this.isDataTableSalted = isDataTableSalted; } - - private IndexMaintainer(PTable dataTable, PTable index, PhoenixConnection connection) { + + private IndexMaintainer(final PTable dataTable, final PTable index, PhoenixConnection connection) { this(dataTable.getRowKeySchema(), dataTable.getBucketNum() != null); assert(dataTable.getType() == PTableType.SYSTEM || dataTable.getType() == PTableType.TABLE || dataTable.getType() == PTableType.VIEW); this.rowKeyOrderOptimizable = index.rowKeyOrderOptimizable(); this.isMultiTenant = dataTable.isMultiTenant(); this.viewIndexId = index.getViewIndexId() == null ? null : MetaDataUtil.getViewIndexIdDataType().toBytes(index.getViewIndexId()); this.isLocalIndex = index.getIndexType() == IndexType.LOCAL; - + this.encodingScheme = index.getEncodingScheme(); + + // null check for b/w compatibility + this.encodingScheme = index.getEncodingScheme() == null ? QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : index.getEncodingScheme(); + this.immutableStorageScheme = index.getImmutableStorageScheme() == null ? ImmutableStorageScheme.ONE_CELL_PER_COLUMN : index.getImmutableStorageScheme(); + byte[] indexTableName = index.getPhysicalName().getBytes(); // Use this for the nDataSaltBuckets as we need this for local indexes // TODO: persist nDataSaltBuckets separately, but maintain b/w compat. Integer nIndexSaltBuckets = isLocalIndex ? dataTable.getBucketNum() : index.getBucketNum(); boolean indexWALDisabled = index.isWALDisabled(); int indexPosOffset = (index.getBucketNum() == null ? 0 : 1) + (this.isMultiTenant ? 1 : 0) + (this.viewIndexId == null ? 0 : 1); -// int indexPosOffset = !isLocalIndex && nIndexSaltBuckets > 0 ? 1 : 0; int nIndexColumns = index.getColumns().size() - indexPosOffset; int nIndexPKColumns = index.getPKColumns().size() - indexPosOffset; // number of expressions that are indexed that are not present in the row key of the data table @@ -334,7 +387,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { String dataFamilyName = IndexUtil.getDataColumnFamilyName(indexColumnName); String dataColumnName = IndexUtil.getDataColumnName(indexColumnName); try { - PColumn dataColumn = dataFamilyName.equals("") ? dataTable.getColumn(dataColumnName) : dataTable.getColumnFamily(dataFamilyName).getColumn(dataColumnName); + PColumn dataColumn = dataFamilyName.equals("") ? dataTable.getPColumnForColumnName(dataColumnName) : dataTable.getColumnFamily(dataFamilyName).getPColumnForColumnName(dataColumnName); if (SchemaUtil.isPKColumn(dataColumn)) continue; } catch (ColumnNotFoundException e) { @@ -366,8 +419,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { this.indexTableName = indexTableName; this.indexedColumnTypes = Lists.<PDataType>newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns); this.indexedExpressions = Lists.newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns); - this.coveredColumns = Sets.newLinkedHashSetWithExpectedSize(nIndexColumns-nIndexPKColumns); - this.dataTableLocalIndexFamilyMap = Maps.newHashMapWithExpectedSize(nIndexColumns-nIndexPKColumns); + this.coveredColumnsMap = Maps.newHashMapWithExpectedSize(nIndexColumns - nIndexPKColumns); this.nIndexSaltBuckets = nIndexSaltBuckets == null ? 0 : nIndexSaltBuckets; this.dataEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(dataTable); this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(index); @@ -397,6 +449,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { throw new RuntimeException(e); // Impossible } StatementContext context = new StatementContext(new PhoenixStatement(connection), resolver); + this.indexedColumnsInfo = Sets.newHashSetWithExpectedSize(nIndexColumns - nIndexPKColumns); + IndexExpressionCompiler expressionIndexCompiler = new IndexExpressionCompiler(context); for (int i = indexPosOffset; i < index.getPKColumns().size(); i++) { PColumn indexColumn = index.getPKColumns().get(i); @@ -409,12 +463,13 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { throw new RuntimeException(e); // Impossible } if ( expressionIndexCompiler.getColumnRef()!=null ) { - // get the column of the data table that corresponds to this index column + // get the column of the data column that corresponds to this index column PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString()); boolean isPKColumn = SchemaUtil.isPKColumn(column); if (isPKColumn) { int dataPkPos = dataTable.getPKColumns().indexOf(column) - (dataTable.getBucketNum() == null ? 0 : 1) - (this.isMultiTenant ? 1 : 0); this.rowKeyMetaData.setIndexPkPosition(dataPkPos, indexPos); + indexedColumnsInfo.add(new Pair<>((String)null, column.getName().getString())); } else { indexColByteSize += column.getDataType().isFixedWidth() ? SchemaUtil.getFixedByteSize(column) : ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE; try { @@ -424,6 +479,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { expression = CoerceExpression.create(expression, indexColumn.getDataType()); } this.indexedExpressions.add(expression); + indexedColumnsInfo.add(new Pair<>(column.getFamilyName().getString(), column.getName().getString())); } catch (SQLException e) { throw new RuntimeException(e); // Impossible } @@ -432,6 +488,45 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { else { indexColByteSize += expression.getDataType().isFixedWidth() ? SchemaUtil.getFixedByteSize(expression) : ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE; this.indexedExpressions.add(expression); + KeyValueExpressionVisitor kvVisitor = new KeyValueExpressionVisitor() { + @Override + public Void visit(KeyValueColumnExpression colExpression) { + return addDataColInfo(dataTable, colExpression); + } + + @Override + public Void visit(SingleCellColumnExpression expression) { + return addDataColInfo(dataTable, expression); + } + + private Void addDataColInfo(final PTable dataTable, Expression expression) { + Preconditions.checkArgument(expression instanceof SingleCellColumnExpression + || expression instanceof KeyValueColumnExpression); + + KeyValueColumnExpression colExpression = null; + if (expression instanceof SingleCellColumnExpression) { + colExpression = + ((SingleCellColumnExpression) expression).getKeyValueExpression(); + } else { + colExpression = ((KeyValueColumnExpression) expression); + } + byte[] cf = colExpression.getColumnFamily(); + byte[] cq = colExpression.getColumnQualifier(); + try { + PColumn dataColumn = + cf == null ? dataTable.getPColumnForColumnQualifier(null, cq) + : dataTable.getColumnFamily(cf) + .getPColumnForColumnQualifier(cq); + indexedColumnsInfo.add(new Pair<>(dataColumn.getFamilyName() + .getString(), dataColumn.getName().getString())); + } catch (ColumnNotFoundException | ColumnFamilyNotFoundException + | AmbiguousColumnException e) { + throw new RuntimeException(e); + } + return null; + } + }; + expression.accept(kvVisitor); } // set the sort order of the expression correctly if (indexColumn.getSortOrder() == SortOrder.DESC) { @@ -442,18 +537,17 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { for (int i = 0; i < index.getColumnFamilies().size(); i++) { PColumnFamily family = index.getColumnFamilies().get(i); for (PColumn indexColumn : family.getColumns()) { - PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString()); - PName dataTableFamily = column.getFamilyName(); - this.coveredColumns.add(new ColumnReference(dataTableFamily.getBytes(), column.getName().getBytes())); - if(isLocalIndex) { - this.dataTableLocalIndexFamilyMap.put(new ImmutableBytesPtr(dataTableFamily.getBytes()), new ImmutableBytesWritable(Bytes.toBytes(IndexUtil.getLocalIndexColumnFamily(dataTableFamily.getString())))); - } + PColumn dataColumn = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString()); + byte[] dataColumnCq = dataColumn.getColumnQualifierBytes(); + byte[] indexColumnCq = indexColumn.getColumnQualifierBytes(); + this.coveredColumnsMap.put(new ColumnReference(dataColumn.getFamilyName().getBytes(), dataColumnCq), + new ColumnReference(indexColumn.getFamilyName().getBytes(), indexColumnCq)); } } this.estimatedIndexRowKeyBytes = estimateIndexRowKeyByteSize(indexColByteSize); initCachedState(); } - + public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey) { ImmutableBytesWritable ptr = new ImmutableBytesWritable(); boolean prependRegionStartKey = isLocalIndex && regionStartKey != null; @@ -854,37 +948,106 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } return indexRowKeySchema; } - + public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter valueGetter, ImmutableBytesWritable dataRowKeyPtr, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException { + byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey); Put put = null; // New row being inserted: add the empty key value if (valueGetter.getLatestValue(dataEmptyKeyValueRef) == null) { - byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey); put = new Put(indexRowKey); // add the keyvalue for the empty row put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey), - this.getEmptyKeyValueFamily(), QueryConstants.EMPTY_COLUMN_BYTES_PTR, ts, + this.getEmptyKeyValueFamily(), dataEmptyKeyValueRef.getQualifierWritable(), ts, // set the value to the empty column name - QueryConstants.EMPTY_COLUMN_BYTES_PTR)); + dataEmptyKeyValueRef.getQualifierWritable())); put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } - int i = 0; - for (ColumnReference ref : this.getCoveredColumns()) { - ImmutableBytesPtr cq = this.indexQualifiers.get(i++); - ImmutableBytesWritable value = valueGetter.getLatestValue(ref); - byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey); - ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey); - if (value != null) { + ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey); + if (immutableStorageScheme != ImmutableStorageScheme.ONE_CELL_PER_COLUMN) { + // map from index column family to list of pair of index column and data column (for covered columns) + Map<ImmutableBytesPtr, List<Pair<ColumnReference, ColumnReference>>> familyToColListMap = Maps.newHashMap(); + for (ColumnReference ref : this.getCoveredColumns()) { + ColumnReference indexColRef = this.coveredColumnsMap.get(ref); + ImmutableBytesPtr cf = new ImmutableBytesPtr(indexColRef.getFamily()); + if (!familyToColListMap.containsKey(cf)) { + familyToColListMap.put(cf, Lists.<Pair<ColumnReference, ColumnReference>>newArrayList()); + } + familyToColListMap.get(cf).add(Pair.newPair(indexColRef, ref)); + } + // iterate over each column family and create a byte[] containing all the columns + for (Entry<ImmutableBytesPtr, List<Pair<ColumnReference, ColumnReference>>> entry : familyToColListMap.entrySet()) { + byte[] columnFamily = entry.getKey().copyBytesIfNecessary(); + List<Pair<ColumnReference, ColumnReference>> colRefPairs = entry.getValue(); + int maxEncodedColumnQualifier = Integer.MIN_VALUE; + // find the max col qualifier + for (Pair<ColumnReference, ColumnReference> colRefPair : colRefPairs) { + maxEncodedColumnQualifier = Math.max(maxEncodedColumnQualifier, encodingScheme.decode(colRefPair.getFirst().getQualifier())); + } + Expression[] colValues = EncodedColumnsUtil.createColumnExpressionArray(maxEncodedColumnQualifier); + // set the values of the columns + for (Pair<ColumnReference, ColumnReference> colRefPair : colRefPairs) { + ColumnReference indexColRef = colRefPair.getFirst(); + ColumnReference dataColRef = colRefPair.getSecond(); + Expression expression = new SingleCellColumnExpression(new PDatum() { + @Override + public boolean isNullable() { + return false; + } + + @Override + public SortOrder getSortOrder() { + return null; + } + + @Override + public Integer getScale() { + return null; + } + + @Override + public Integer getMaxLength() { + return null; + } + + @Override + public PDataType getDataType() { + return null; + } + }, dataColRef.getFamily(), dataColRef.getQualifier(), encodingScheme); + ImmutableBytesPtr ptr = new ImmutableBytesPtr(); + expression.evaluate(new ValueGetterTuple(valueGetter), ptr); + byte[] value = ptr.copyBytesIfNecessary(); + if (value != null) { + int indexArrayPos = encodingScheme.decode(indexColRef.getQualifier())-QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE+1; + colValues[indexArrayPos] = new LiteralExpression(value); + } + } + + List<Expression> children = Arrays.asList(colValues); + // we use SingleCellConstructorExpression to serialize multiple columns into a single byte[] + SingleCellConstructorExpression singleCellConstructorExpression = new SingleCellConstructorExpression(immutableStorageScheme, children); + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + singleCellConstructorExpression.evaluate(new BaseTuple() {}, ptr); if (put == null) { put = new Put(indexRowKey); put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } + ImmutableBytesPtr colFamilyPtr = new ImmutableBytesPtr(columnFamily); //this is a little bit of extra work for installations that are running <0.94.14, but that should be rare and is a short-term set of wrappers - it shouldn't kill GC - if(this.isLocalIndex) { - ImmutableBytesWritable localIndexColFamily = this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable()); - put.add(kvBuilder.buildPut(rowKey, localIndexColFamily, cq, ts, value)); - } else { - put.add(kvBuilder.buildPut(rowKey, ref.getFamilyWritable(), cq, ts, value)); + put.add(kvBuilder.buildPut(rowKey, colFamilyPtr, QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES_PTR, ts, ptr)); + } + } else { + for (ColumnReference ref : this.getCoveredColumns()) { + ColumnReference indexColRef = this.coveredColumnsMap.get(ref); + ImmutableBytesPtr cq = indexColRef.getQualifierWritable(); + ImmutableBytesPtr cf = indexColRef.getFamilyWritable(); + ImmutableBytesWritable value = valueGetter.getLatestValue(ref); + if (value != null) { + if (put == null) { + put = new Put(indexRowKey); + put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); + } + put.add(kvBuilder.buildPut(rowKey, cf, cq, ts, value)); } } } @@ -962,7 +1125,6 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { return buildDeleteMutation(kvBuilder, null, dataRowKeyPtr, Collections.<KeyValue>emptyList(), ts, null, null); } - @SuppressWarnings("deprecation") public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<KeyValue> pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException { byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, regionStartKey, regionEndKey); // Delete the entire row if any of the indexed columns changed @@ -972,15 +1134,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { Delete delete = new Delete(indexRowKey); for (ColumnReference ref : getCoveredColumns()) { - byte[] family = ref.getFamily(); - if (this.isLocalIndex) { - family = this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable()).get(); - } + ColumnReference indexColumn = coveredColumnsMap.get(ref); // If table delete was single version, then index delete should be as well if (deleteType == DeleteType.SINGLE_VERSION) { - delete.deleteFamilyVersion(family, ts); + delete.deleteFamilyVersion(indexColumn.getFamily(), ts); } else { - delete.deleteFamily(family, ts); + delete.deleteFamily(indexColumn.getFamily(), ts); } } if (deleteType == DeleteType.SINGLE_VERSION) { @@ -992,34 +1151,35 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { return delete; } Delete delete = null; + Set<ColumnReference> dataTableColRefs = coveredColumnsMap.keySet(); // Delete columns for missing key values for (Cell kv : pendingUpdates) { if (kv.getTypeByte() != KeyValue.Type.Put.getCode()) { ColumnReference ref = new ColumnReference(kv.getFamily(), kv.getQualifier()); - if (coveredColumns.contains(ref)) { + if (dataTableColRefs.contains(ref)) { if (delete == null) { delete = new Delete(indexRowKey); delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } - byte[] family = this.isLocalIndex ? this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable()).get() : ref.getFamily(); + ColumnReference indexColumn = coveredColumnsMap.get(ref); // If point delete for data table, then use point delete for index as well - if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) { - delete.deleteColumn(family, IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts); + if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) { + delete.deleteColumn(indexColumn.getFamily(), indexColumn.getQualifier(), ts); } else { - delete.deleteColumns(family, IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts); + delete.deleteColumns(indexColumn.getFamily(), indexColumn.getQualifier(), ts); } } } } return delete; } - + public byte[] getIndexTableName() { return indexTableName; } public Set<ColumnReference> getCoveredColumns() { - return coveredColumns; + return coveredColumnsMap.keySet(); } public Set<ColumnReference> getAllColumns() { @@ -1032,7 +1192,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // If if there are no covered columns, we know it's our default name return emptyKeyValueCFPtr; } - + + @Deprecated // Only called by code older than our 4.10 release @Override public void readFields(DataInput input) throws IOException { int encodedIndexSaltBucketsAndMultiTenant = WritableUtils.readVInt(input); @@ -1060,16 +1221,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { int encodedCoveredolumnsAndLocalIndex = WritableUtils.readVInt(input); isLocalIndex = encodedCoveredolumnsAndLocalIndex < 0; int nCoveredColumns = Math.abs(encodedCoveredolumnsAndLocalIndex) - 1; - coveredColumns = Sets.newLinkedHashSetWithExpectedSize(nCoveredColumns); - dataTableLocalIndexFamilyMap = Maps.newHashMapWithExpectedSize(nCoveredColumns); + coveredColumnsMap = Maps.newHashMapWithExpectedSize(nCoveredColumns); for (int i = 0; i < nCoveredColumns; i++) { - byte[] cf = Bytes.readByteArray(input); - byte[] cq = Bytes.readByteArray(input); - ColumnReference ref = new ColumnReference(cf,cq); - coveredColumns.add(ref); - if(isLocalIndex) { - dataTableLocalIndexFamilyMap.put(ref.getFamilyWritable(), new ImmutableBytesWritable(Bytes.toBytes(IndexUtil.getLocalIndexColumnFamily(Bytes.toString(cf))))); - } + byte[] dataTableCf = Bytes.readByteArray(input); + byte[] dataTableCq = Bytes.readByteArray(input); + ColumnReference dataTableRef = new ColumnReference(dataTableCf, dataTableCq); + byte[] indexTableCf = isLocalIndex ? IndexUtil.getLocalIndexColumnFamily(dataTableCf) : dataTableCf; + byte[] indexTableCq = IndexUtil.getIndexColumnName(dataTableCf, dataTableCq); + ColumnReference indexTableRef = new ColumnReference(indexTableCf, indexTableCq); + coveredColumnsMap.put(dataTableRef, indexTableRef); } // Hack to serialize whether the index row key is optimizable int len = WritableUtils.readVInt(input); @@ -1097,9 +1257,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { int numIndexedExpressions = WritableUtils.readVInt(input); indexedExpressions = Lists.newArrayListWithExpectedSize(numIndexedExpressions); for (int i = 0; i < numIndexedExpressions; i++) { - Expression expression = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance(); - expression.readFields(input); - indexedExpressions.add(expression); + Expression expression = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance(); + expression.readFields(input); + indexedExpressions.add(expression); } } else { @@ -1151,6 +1311,79 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { initCachedState(); } + + public static IndexMaintainer fromProto(ServerCachingProtos.IndexMaintainer proto, RowKeySchema dataTableRowKeySchema, boolean isDataTableSalted) throws IOException { + IndexMaintainer maintainer = new IndexMaintainer(dataTableRowKeySchema, isDataTableSalted); + maintainer.nIndexSaltBuckets = proto.getSaltBuckets(); + maintainer.isMultiTenant = proto.getIsMultiTenant(); + maintainer.viewIndexId = proto.hasViewIndexId() ? proto.getViewIndexId().toByteArray() : null; + List<ServerCachingProtos.ColumnReference> indexedColumnsList = proto.getIndexedColumnsList(); + maintainer.indexedColumns = new HashSet<ColumnReference>(indexedColumnsList.size()); + for (ServerCachingProtos.ColumnReference colRefFromProto : indexedColumnsList) { + maintainer.indexedColumns.add(new ColumnReference(colRefFromProto.getFamily().toByteArray(), colRefFromProto.getQualifier().toByteArray())); + } + List<Integer> indexedColumnTypes = proto.getIndexedColumnTypeOrdinalList(); + maintainer.indexedColumnTypes = new ArrayList<PDataType>(indexedColumnTypes.size()); + for (Integer typeOrdinal : indexedColumnTypes) { + maintainer.indexedColumnTypes.add(PDataType.values()[typeOrdinal]); + } + maintainer.indexTableName = proto.getIndexTableName().toByteArray(); + maintainer.rowKeyOrderOptimizable = proto.getRowKeyOrderOptimizable(); + maintainer.dataEmptyKeyValueCF = proto.getDataTableEmptyKeyValueColFamily().toByteArray(); + ServerCachingProtos.ImmutableBytesWritable emptyKeyValueColFamily = proto.getEmptyKeyValueColFamily(); + maintainer.emptyKeyValueCFPtr = new ImmutableBytesPtr(emptyKeyValueColFamily.getByteArray().toByteArray(), emptyKeyValueColFamily.getOffset(), emptyKeyValueColFamily.getLength()); + maintainer.indexedExpressions = new ArrayList<>(); + try (ByteArrayInputStream stream = new ByteArrayInputStream(proto.getIndexedExpressions().toByteArray())) { + DataInput input = new DataInputStream(stream); + while (stream.available() > 0) { + int expressionOrdinal = WritableUtils.readVInt(input); + Expression expression = ExpressionType.values()[expressionOrdinal].newInstance(); + expression.readFields(input); + maintainer.indexedExpressions.add(expression); + } + } + maintainer.rowKeyMetaData = newRowKeyMetaData(maintainer, dataTableRowKeySchema, maintainer.indexedExpressions.size(), isDataTableSalted, maintainer.isMultiTenant); + try (ByteArrayInputStream stream = new ByteArrayInputStream(proto.getRowKeyMetadata().toByteArray())) { + DataInput input = new DataInputStream(stream); + maintainer.rowKeyMetaData.readFields(input); + } + maintainer.nDataCFs = proto.getNumDataTableColFamilies(); + maintainer.indexWALDisabled = proto.getIndexWalDisabled(); + maintainer.estimatedIndexRowKeyBytes = proto.getIndexRowKeyByteSize(); + maintainer.immutableRows = proto.getImmutable(); + List<ColumnInfo> indexedColumnInfoList = proto.getIndexedColumnInfoList(); + maintainer.indexedColumnsInfo = Sets.newHashSet(); + for (ColumnInfo info : indexedColumnInfoList) { + maintainer.indexedColumnsInfo.add(new Pair<>(info.getFamilyName(), info.getColumnName())); + } + // proto doesn't support single byte so need an explicit cast here + maintainer.encodingScheme = PTable.QualifierEncodingScheme.fromSerializedValue((byte)proto.getEncodingScheme()); + maintainer.immutableStorageScheme = PTable.ImmutableStorageScheme.fromSerializedValue((byte)proto.getImmutableStorageScheme()); + maintainer.isLocalIndex = proto.getIsLocalIndex(); + + List<ServerCachingProtos.ColumnReference> dataTableColRefsForCoveredColumnsList = proto.getDataTableColRefForCoveredColumnsList(); + List<ServerCachingProtos.ColumnReference> indexTableColRefsForCoveredColumnsList = proto.getIndexTableColRefForCoveredColumnsList(); + maintainer.coveredColumnsMap = Maps.newHashMapWithExpectedSize(dataTableColRefsForCoveredColumnsList.size()); + boolean encodedColumnNames = maintainer.encodingScheme != NON_ENCODED_QUALIFIERS; + Iterator<ServerCachingProtos.ColumnReference> indexTableColRefItr = indexTableColRefsForCoveredColumnsList.iterator(); + for (ServerCachingProtos.ColumnReference colRefFromProto : dataTableColRefsForCoveredColumnsList) { + ColumnReference dataTableColRef = new ColumnReference(colRefFromProto.getFamily().toByteArray(), colRefFromProto.getQualifier( ).toByteArray()); + ColumnReference indexTableColRef; + if (encodedColumnNames) { + ServerCachingProtos.ColumnReference fromProto = indexTableColRefItr.next(); + indexTableColRef = new ColumnReference(fromProto.getFamily().toByteArray(), fromProto.getQualifier( ).toByteArray()); + } else { + byte[] cq = IndexUtil.getIndexColumnName(dataTableColRef.getFamily(), dataTableColRef.getQualifier()); + byte[] cf = maintainer.isLocalIndex ? IndexUtil.getLocalIndexColumnFamily(dataTableColRef.getFamily()) : dataTableColRef.getFamily(); + indexTableColRef = new ColumnReference(cf, cq); + } + maintainer.coveredColumnsMap.put(dataTableColRef, indexTableColRef); + } + maintainer.initCachedState(); + return maintainer; + } + + @Deprecated // Only called by code older than our 4.10 release @Override public void write(DataOutput output) throws IOException { // Encode nIndexSaltBuckets and isMultiTenant together @@ -1170,8 +1403,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { WritableUtils.writeVInt(output, type.ordinal()); } // Encode coveredColumns.size() and whether or not this is a local index - WritableUtils.writeVInt(output, (coveredColumns.size() + 1) * (isLocalIndex ? -1 : 1)); - for (ColumnReference ref : coveredColumns) { + WritableUtils.writeVInt(output, (coveredColumnsMap.size() + 1) * (isLocalIndex ? -1 : 1)); + for (ColumnReference ref : coveredColumnsMap.keySet()) { Bytes.writeByteArray(output, ref.getFamily()); Bytes.writeByteArray(output, ref.getQualifier()); } @@ -1186,8 +1419,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { WritableUtils.writeVInt(output, indexedExpressions.size()); for (Expression expression : indexedExpressions) { - WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal()); - expression.write(output); + WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal()); + expression.write(output); } rowKeyMetaData.write(output); @@ -1196,6 +1429,76 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // Encode estimatedIndexRowKeyBytes and immutableRows together. WritableUtils.writeVInt(output, estimatedIndexRowKeyBytes * (immutableRows ? -1 : 1)); } + + public static ServerCachingProtos.IndexMaintainer toProto(IndexMaintainer maintainer) throws IOException { + ServerCachingProtos.IndexMaintainer.Builder builder = ServerCachingProtos.IndexMaintainer.newBuilder(); + builder.setSaltBuckets(maintainer.nIndexSaltBuckets); + builder.setIsMultiTenant(maintainer.isMultiTenant); + if (maintainer.viewIndexId != null) { + builder.setViewIndexId(ByteStringer.wrap(maintainer.viewIndexId)); + } + for (ColumnReference colRef : maintainer.indexedColumns) { + ServerCachingProtos.ColumnReference.Builder cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder(); + cRefBuilder.setFamily(ByteStringer.wrap(colRef.getFamily())); + cRefBuilder.setQualifier(ByteStringer.wrap(colRef.getQualifier())); + builder.addIndexedColumns(cRefBuilder.build()); + } + for (PDataType dataType : maintainer.indexedColumnTypes) { + builder.addIndexedColumnTypeOrdinal(dataType.ordinal()); + } + for (Entry<ColumnReference, ColumnReference> e : maintainer.coveredColumnsMap.entrySet()) { + ServerCachingProtos.ColumnReference.Builder cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder(); + ColumnReference dataTableColRef = e.getKey(); + cRefBuilder.setFamily(ByteStringer.wrap(dataTableColRef.getFamily())); + cRefBuilder.setQualifier(ByteStringer.wrap(dataTableColRef.getQualifier())); + builder.addDataTableColRefForCoveredColumns(cRefBuilder.build()); + if (maintainer.encodingScheme != NON_ENCODED_QUALIFIERS) { + // We need to serialize the colRefs of index tables only in case of encoded column names. + ColumnReference indexTableColRef = e.getValue(); + cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder(); + cRefBuilder.setFamily(ByteStringer.wrap(indexTableColRef.getFamily())); + cRefBuilder.setQualifier(ByteStringer.wrap(indexTableColRef.getQualifier())); + builder.addIndexTableColRefForCoveredColumns(cRefBuilder.build()); + } + } + builder.setIsLocalIndex(maintainer.isLocalIndex); + builder.setIndexTableName(ByteStringer.wrap(maintainer.indexTableName)); + builder.setRowKeyOrderOptimizable(maintainer.rowKeyOrderOptimizable); + builder.setDataTableEmptyKeyValueColFamily(ByteStringer.wrap(maintainer.dataEmptyKeyValueCF)); + ServerCachingProtos.ImmutableBytesWritable.Builder ibwBuilder = ServerCachingProtos.ImmutableBytesWritable.newBuilder(); + ibwBuilder.setByteArray(ByteStringer.wrap(maintainer.emptyKeyValueCFPtr.get())); + ibwBuilder.setLength(maintainer.emptyKeyValueCFPtr.getLength()); + ibwBuilder.setOffset(maintainer.emptyKeyValueCFPtr.getOffset()); + builder.setEmptyKeyValueColFamily(ibwBuilder.build()); + try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { + DataOutput output = new DataOutputStream(stream); + for (Expression expression : maintainer.indexedExpressions) { + WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal()); + expression.write(output); + } + builder.setIndexedExpressions(ByteStringer.wrap(stream.toByteArray())); + } + try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { + DataOutput output = new DataOutputStream(stream); + maintainer.rowKeyMetaData.write(output); + builder.setRowKeyMetadata(ByteStringer.wrap(stream.toByteArray())); + } + builder.setNumDataTableColFamilies(maintainer.nDataCFs); + builder.setIndexWalDisabled(maintainer.indexWALDisabled); + builder.setIndexRowKeyByteSize(maintainer.estimatedIndexRowKeyBytes); + builder.setImmutable(maintainer.immutableRows); + for (Pair<String, String> p : maintainer.indexedColumnsInfo) { + ServerCachingProtos.ColumnInfo.Builder ciBuilder = ServerCachingProtos.ColumnInfo.newBuilder(); + if (p.getFirst() != null) { + ciBuilder.setFamilyName(p.getFirst()); + } + ciBuilder.setColumnName(p.getSecond()); + builder.addIndexedColumnInfo(ciBuilder.build()); + } + builder.setEncodingScheme(maintainer.encodingScheme.getSerializedMetadataValue()); + builder.setImmutableStorageScheme(maintainer.immutableStorageScheme.getSerializedMetadataValue()); + return builder.build(); + } public int getEstimatedByteSize() { int size = WritableUtils.getVIntSize(nIndexSaltBuckets); @@ -1212,8 +1515,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { PDataType type = indexedColumnTypes.get(i); size += WritableUtils.getVIntSize(type.ordinal()); } - size += WritableUtils.getVIntSize(coveredColumns.size()); - for (ColumnReference ref : coveredColumns) { + Set<ColumnReference> dataTableColRefs = coveredColumnsMap.keySet(); + size += WritableUtils.getVIntSize(dataTableColRefs.size()); + for (ColumnReference ref : dataTableColRefs) { size += WritableUtils.getVIntSize(ref.getFamilyWritable().getSize()); size += ref.getFamily().length; size += WritableUtils.getVIntSize(ref.getQualifierWritable().getSize()); @@ -1241,24 +1545,16 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { * Init calculated state reading/creating */ private void initCachedState() { - dataEmptyKeyValueRef = - new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(), - QueryConstants.EMPTY_COLUMN_BYTES); - - indexQualifiers = Lists.newArrayListWithExpectedSize(this.coveredColumns.size()); - for (ColumnReference ref : coveredColumns) { - indexQualifiers.add(new ImmutableBytesPtr(IndexUtil.getIndexColumnName( - ref.getFamily(), ref.getQualifier()))); - } - - this.allColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + coveredColumns.size()); + byte[] emptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(encodingScheme).getFirst(); + dataEmptyKeyValueRef = new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(), emptyKvQualifier); + this.allColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + coveredColumnsMap.size()); // columns that are required to evaluate all expressions in indexedExpressions (not including columns in data row key) this.indexedColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size()); for (Expression expression : indexedExpressions) { KeyValueExpressionVisitor visitor = new KeyValueExpressionVisitor() { @Override public Void visit(KeyValueColumnExpression expression) { - if (indexedColumns.add(new ColumnReference(expression.getColumnFamily(), expression.getColumnName()))) { + if (indexedColumns.add(new ColumnReference(expression.getColumnFamily(), expression.getColumnQualifier()))) { indexedColumnTypes.add(expression.getDataType()); } return null; @@ -1267,7 +1563,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { expression.accept(visitor); } allColumns.addAll(indexedColumns); - allColumns.addAll(coveredColumns); + allColumns.addAll(coveredColumnsMap.keySet()); int dataPkOffset = (isDataTableSalted ? 1 : 0) + (isMultiTenant ? 1 : 0); int nIndexPkColumns = getIndexPkColumnCount(); @@ -1311,12 +1607,21 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } private int getIndexPkColumnCount() { - return dataRowKeySchema.getFieldCount() + indexedExpressions.size() - (isDataTableSalted ? 1 : 0) - (isMultiTenant ? 1 : 0); + return getIndexPkColumnCount(dataRowKeySchema, indexedExpressions.size(), isDataTableSalted, isMultiTenant); + } + + private static int getIndexPkColumnCount(RowKeySchema rowKeySchema, int numIndexExpressions, boolean isDataTableSalted, boolean isMultiTenant) { + return rowKeySchema.getFieldCount() + numIndexExpressions - (isDataTableSalted ? 1 : 0) - (isMultiTenant ? 1 : 0); } private RowKeyMetaData newRowKeyMetaData() { return getIndexPkColumnCount() < 0xFF ? new ByteSizeRowKeyMetaData() : new IntSizedRowKeyMetaData(); } + + private static RowKeyMetaData newRowKeyMetaData(IndexMaintainer i, RowKeySchema rowKeySchema, int numIndexExpressions, boolean isDataTableSalted, boolean isMultiTenant) { + int indexPkColumnCount = getIndexPkColumnCount(rowKeySchema, numIndexExpressions, isDataTableSalted, isMultiTenant); + return indexPkColumnCount < 0xFF ? i.new ByteSizeRowKeyMetaData() : i.new IntSizedRowKeyMetaData(); + } private RowKeyMetaData newRowKeyMetaData(int capacity) { return capacity < 0xFF ? new ByteSizeRowKeyMetaData(capacity) : new IntSizedRowKeyMetaData(capacity); @@ -1523,4 +1828,17 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { return udfParseNodes; } } + + public byte[] getEmptyKeyValueQualifier() { + return dataEmptyKeyValueRef.getQualifier(); + } + + public Set<Pair<String, String>> getIndexedColumnInfo() { + return indexedColumnsInfo; + } + + public ImmutableStorageScheme getIndexStorageScheme() { + return immutableStorageScheme; + } + }
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java index 05a01b9..fcabdfd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java @@ -93,4 +93,5 @@ public class IndexMetaDataCacheClient { */ return serverCache.addServerCache(ranges, ptr, txState, new IndexMetaDataCacheFactory(), cacheUsingTableRef); } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java index 56849fe..9edcafc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java @@ -47,10 +47,10 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory { } @Override - public Closeable newCache (ImmutableBytesWritable cachePtr, byte[] txState, final MemoryChunk chunk) throws SQLException { + public Closeable newCache (ImmutableBytesWritable cachePtr, byte[] txState, final MemoryChunk chunk, boolean useProtoForIndexMaintainer) throws SQLException { // just use the standard keyvalue builder - this doesn't really need to be fast final List<IndexMaintainer> maintainers = - IndexMaintainer.deserialize(cachePtr, GenericKeyValueBuilder.INSTANCE); + IndexMaintainer.deserialize(cachePtr, GenericKeyValueBuilder.INSTANCE, useProtoForIndexMaintainer); final Transaction txn; try { txn = txState.length!=0 ? MutationState.decodeTransaction(txState) : null; http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java index bf1d0fb..05211c0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java @@ -166,7 +166,7 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder { ExpressionVisitor<Void> visitor = new StatelessTraverseAllExpressionVisitor<Void>() { @Override public Void visit(KeyValueColumnExpression expression) { - get.addColumn(expression.getColumnFamily(), expression.getColumnName()); + get.addColumn(expression.getColumnFamily(), expression.getColumnQualifier()); estimatedSizeHolder[0]++; return null; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java index 9d2955b..4116101 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java @@ -38,6 +38,7 @@ import com.google.common.collect.Lists; */ public class PhoenixIndexCodec extends BaseIndexCodec { public static final String INDEX_MD = "IdxMD"; + public static final String INDEX_PROTO_MD = "IdxProtoMD"; public static final String INDEX_UUID = "IdxUUID"; public static final String INDEX_MAINTAINERS = "IndexMaintainers"; private static KeyValueBuilder KV_BUILDER = GenericKeyValueBuilder.INSTANCE; http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java index e515dbb..5da8be8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java @@ -226,4 +226,4 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { } return indexTableNames; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java index d22e957..39473dc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java @@ -47,10 +47,15 @@ public class PhoenixIndexMetaData implements IndexMetaData { if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; } byte[] uuid = attributes.get(PhoenixIndexCodec.INDEX_UUID); if (uuid == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; } - byte[] md = attributes.get(PhoenixIndexCodec.INDEX_MD); + boolean useProto = false; + byte[] md = attributes.get(PhoenixIndexCodec.INDEX_PROTO_MD); + useProto = md != null; + if (md == null) { + md = attributes.get(PhoenixIndexCodec.INDEX_MD); + } byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE); if (md != null) { - final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md); + final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md, useProto); final Transaction txn = MutationState.decodeTransaction(txState); return new IndexMetaDataCache() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index c67da6e..9ee5ea7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -67,7 +67,6 @@ import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.write.IndexWriter; import org.apache.phoenix.query.KeyRange; -import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.trace.TracingUtils; import org.apache.phoenix.trace.util.NullSpan; @@ -304,8 +303,16 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { for (ColumnReference ref : mutableColumns) { scan.addColumn(ref.getFamily(), ref.getQualifier()); } + /* + * Indexes inherit the storage scheme of the data table which means all the indexes have the same + * storage scheme and empty key value qualifier. Note that this assumption would be broken if we start + * supporting new indexes over existing data tables to have a different storage scheme than the data + * table. + */ + byte[] emptyKeyValueQualifier = indexMaintainers.get(0).getEmptyKeyValueQualifier(); + // Project empty key value column - scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES); + scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), emptyKeyValueQualifier); ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1); scanRanges.initializeScan(scan); TableName tableName = env.getRegion().getRegionInfo().getTable(); @@ -356,7 +363,8 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { Map<ImmutableBytesPtr, MultiMutation> mutationsToFindPreviousValue) throws IOException { if (scanner != null) { Result result; - ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES); + ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0) + .getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier()); // Process existing data table rows by removing the old index row and adding the new index row while ((result = scanner.next()) != null) { Mutation m = mutationsToFindPreviousValue.remove(new ImmutableBytesPtr(result.getRow())); @@ -384,7 +392,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { // to generate point delete markers for all index rows that were added. We don't have Tephra // manage index rows in change sets because we don't want to be hit with the additional // memory hit and do not need to do conflict detection on index rows. - ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES); + ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier()); while ((result = scanner.next()) != null) { Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow())); // Sort by timestamp, type, cf, cq so we can process in time batches from oldest to newest http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index c0f9707..780d70f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -17,11 +17,14 @@ */ package org.apache.phoenix.iterate; +import static com.google.common.base.Preconditions.checkArgument; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER; +import static org.apache.phoenix.schema.PTable.IndexType.LOCAL; +import static org.apache.phoenix.schema.PTableType.INDEX; import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY; import java.io.ByteArrayInputStream; @@ -78,12 +81,12 @@ import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.parse.HintNode.Hint; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.KeyRange; -import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.PTable.ViewType; import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.TableRef; @@ -92,6 +95,8 @@ import org.apache.phoenix.schema.stats.GuidePostsKey; import org.apache.phoenix.schema.stats.StatisticsUtil; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.Closeables; +import org.apache.phoenix.util.EncodedColumnsUtil; +import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.PrefixByteCodec; import org.apache.phoenix.util.PrefixByteDecoder; @@ -159,7 +164,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result return true; } - private static void initializeScan(QueryPlan plan, Integer perScanLimit, Integer offset, Scan scan) { + private static void initializeScan(QueryPlan plan, Integer perScanLimit, Integer offset, Scan scan) throws SQLException { StatementContext context = plan.getContext(); TableRef tableRef = plan.getTableRef(); PTable table = tableRef.getTable(); @@ -210,7 +215,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result // Project empty key value unless the column family containing it has // been projected in its entirety. if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) { - scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES); + scan.addColumn(ecf, EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst()); } } } @@ -228,7 +233,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result if(offset!=null){ ScanUtil.addOffsetAttribute(scan, offset); } - int cols = plan.getGroupBy().getOrderPreservingColumnCount(); if (cols > 0 && keyOnlyFilter && !plan.getStatement().getHint().hasHint(HintNode.Hint.RANGE_SCAN) && @@ -243,13 +247,90 @@ public abstract class BaseResultIterators extends ExplainTable implements Result ScanUtil.andFilterAtEnd(scan, new PageFilter(plan.getLimit())); } } - + scan.setAttribute(BaseScannerRegionObserver.QUALIFIER_ENCODING_SCHEME, new byte[]{table.getEncodingScheme().getSerializedMetadataValue()}); + scan.setAttribute(BaseScannerRegionObserver.IMMUTABLE_STORAGE_ENCODING_SCHEME, new byte[]{table.getImmutableStorageScheme().getSerializedMetadataValue()}); + // When analyzing the table, there is no look up for key values being done. + // So there is no point setting the range. + if (EncodedColumnsUtil.setQualifierRanges(table) && !ScanUtil.isAnalyzeTable(scan)) { + Pair<Integer, Integer> range = getEncodedQualifierRange(scan, context); + if (range != null) { + scan.setAttribute(BaseScannerRegionObserver.MIN_QUALIFIER, Bytes.toBytes(range.getFirst())); + scan.setAttribute(BaseScannerRegionObserver.MAX_QUALIFIER, Bytes.toBytes(range.getSecond())); + } + } if (optimizeProjection) { optimizeProjection(context, scan, table, statement); } } } + + private static Pair<Integer, Integer> getEncodedQualifierRange(Scan scan, StatementContext context) + throws SQLException { + PTable table = context.getCurrentTable().getTable(); + QualifierEncodingScheme encodingScheme = table.getEncodingScheme(); + checkArgument(encodingScheme != QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, + "Method should only be used for tables using encoded column names"); + Pair<Integer, Integer> minMaxQualifiers = new Pair<>(); + for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) { + byte[] cq = whereCol.getSecond(); + if (cq != null) { + int qualifier = table.getEncodingScheme().decode(cq); + determineQualifierRange(qualifier, minMaxQualifiers); + } + } + Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap(); + Map<String, Pair<Integer, Integer>> qualifierRanges = EncodedColumnsUtil.getFamilyQualifierRanges(table); + for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) { + if (entry.getValue() != null) { + for (byte[] cq : entry.getValue()) { + if (cq != null) { + int qualifier = table.getEncodingScheme().decode(cq); + determineQualifierRange(qualifier, minMaxQualifiers); + } + } + } else { + /* + * All the columns of the column family are being projected. So we will need to + * consider all the columns in the column family to determine the min-max range. + */ + String family = Bytes.toString(entry.getKey()); + if (table.getType() == INDEX && table.getIndexType() == LOCAL && !IndexUtil.isLocalIndexFamily(family)) { + //TODO: samarth confirm with James why do we need this hack here :( + family = IndexUtil.getLocalIndexColumnFamily(family); + } + Pair<Integer, Integer> range = qualifierRanges.get(family); + if (range != null) { + determineQualifierRange(range.getFirst(), minMaxQualifiers); + determineQualifierRange(range.getSecond(), minMaxQualifiers); + } + } + } + if (minMaxQualifiers.getFirst() == null) { + return null; + } + return minMaxQualifiers; + } + + /** + * + * @param cq + * @param minMaxQualifiers + * @return true if the empty column was projected + */ + private static void determineQualifierRange(Integer qualifier, Pair<Integer, Integer> minMaxQualifiers) { + if (minMaxQualifiers.getFirst() == null) { + minMaxQualifiers.setFirst(qualifier); + minMaxQualifiers.setSecond(qualifier); + } else { + if (minMaxQualifiers.getFirst() > qualifier) { + minMaxQualifiers.setFirst(qualifier); + } else if (minMaxQualifiers.getSecond() < qualifier) { + minMaxQualifiers.setSecond(qualifier); + } + } + } + private static void optimizeProjection(StatementContext context, Scan scan, PTable table, FilterableStatement statement) { Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap(); // columnsTracker contain cf -> qualifiers which should get returned. @@ -346,7 +427,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result // the ExplicitColumnTracker not to be used, though. if (!statement.isAggregate() && filteredColumnNotInProjection) { ScanUtil.andFilterAtEnd(scan, new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table), - columnsTracker, conditionOnlyCfs)); + columnsTracker, conditionOnlyCfs, EncodedColumnsUtil.usesEncodedColumnNames(table.getEncodingScheme()))); } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java index 3293f65..1e5f09e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java @@ -49,7 +49,7 @@ abstract public class LookAheadResultIterator implements PeekingResultIterator { }; } - private final static Tuple UNINITIALIZED = new ResultTuple(); + private final static Tuple UNINITIALIZED = ResultTuple.EMPTY_TUPLE; private Tuple next = UNINITIALIZED; abstract protected Tuple advance() throws SQLException; http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java index 8ada952..135ab26 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java @@ -180,6 +180,7 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> { return this.index; } + @Override public int size() { if (flushBuffer) return flushedCount; http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java index 8dcb2e8..e4c52c0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java @@ -32,6 +32,7 @@ import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.OrderByExpression; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.SizedUtil; import com.google.common.base.Function; @@ -264,7 +265,7 @@ public class OrderedResultIterator implements PeekingResultIterator { } this.byteSize = queueEntries.getByteSize(); } catch (IOException e) { - throw new SQLException("", e); + ServerUtil.createIOException(e.getMessage(), e); } finally { delegate.close(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java index 88e141a..4b89133 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java @@ -24,16 +24,27 @@ import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; +import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.ServerUtil; public class RegionScannerResultIterator extends BaseResultIterator { private final RegionScanner scanner; + private final Pair<Integer, Integer> minMaxQualifiers; + private final boolean useQualifierAsIndex; + private final QualifierEncodingScheme encodingScheme; - public RegionScannerResultIterator(RegionScanner scanner) { + public RegionScannerResultIterator(RegionScanner scanner, Pair<Integer, Integer> minMaxQualifiers, QualifierEncodingScheme encodingScheme) { this.scanner = scanner; + this.useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers); + this.minMaxQualifiers = minMaxQualifiers; + this.encodingScheme = encodingScheme; } @Override @@ -43,7 +54,7 @@ public class RegionScannerResultIterator extends BaseResultIterator { synchronized (scanner) { try { // TODO: size - List<Cell> results = new ArrayList<Cell>(); + List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>(); // Results are potentially returned even when the return value of s.next is false // since this is an indication of whether or not there are more values after the // ones returned @@ -53,7 +64,7 @@ public class RegionScannerResultIterator extends BaseResultIterator { } // We instantiate a new tuple because in all cases currently we hang on to it // (i.e. to compute and hold onto the TopN). - MultiKeyValueTuple tuple = new MultiKeyValueTuple(); + Tuple tuple = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); tuple.setKeyValues(results); return tuple; } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index ebb2421..0bf7a71 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -205,7 +205,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { public static final byte[] BASE_COLUMN_COUNT_BYTES = Bytes.toBytes(BASE_COLUMN_COUNT); public static final String IS_ROW_TIMESTAMP = "IS_ROW_TIMESTAMP"; public static final byte[] IS_ROW_TIMESTAMP_BYTES = Bytes.toBytes(IS_ROW_TIMESTAMP); - + public static final String TABLE_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY; public static final byte[] TABLE_FAMILY_BYTES = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES; @@ -322,6 +322,15 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { /** Version below which we fall back on the generic KeyValueBuilder */ public static final int CLIENT_KEY_VALUE_BUILDER_THRESHOLD = VersionUtil.encodeVersion("0", "94", "14"); + public static final String IMMUTABLE_STORAGE_SCHEME = "IMMUTABLE_STORAGE_SCHEME"; + public static final byte[] STORAGE_SCHEME_BYTES = Bytes.toBytes(IMMUTABLE_STORAGE_SCHEME); + public static final String ENCODING_SCHEME = "ENCODING_SCHEME"; + public static final byte[] ENCODING_SCHEME_BYTES = Bytes.toBytes(ENCODING_SCHEME); + public static final String COLUMN_QUALIFIER = "COLUMN_QUALIFIER"; + public static final byte[] COLUMN_QUALIFIER_BYTES = Bytes.toBytes(COLUMN_QUALIFIER); + public static final String COLUMN_QUALIFIER_COUNTER = "QUALIFIER_COUNTER"; + public static final byte[] COLUMN_QUALIFIER_COUNTER_BYTES = Bytes.toBytes(COLUMN_QUALIFIER_COUNTER); + PhoenixDatabaseMetaData(PhoenixConnection connection) throws SQLException { this.emptyResultSet = new PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, new StatementContext(new PhoenixStatement(connection), false)); this.connection = connection; @@ -595,9 +604,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { newCells.addAll(cells); newCells.add(kv); Collections.sort(newCells, KeyValue.COMPARATOR); - resultTuple.setResult(Result.create(newCells)); + tuple = new ResultTuple(Result.create(newCells)); } - return tuple; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java index 47c17ae..3ca48a1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java @@ -107,7 +107,7 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable { private final static String STRING_FALSE = "0"; private final static BigDecimal BIG_DECIMAL_FALSE = BigDecimal.valueOf(0); private final static Integer INTEGER_FALSE = Integer.valueOf(0); - private final static Tuple BEFORE_FIRST = new ResultTuple(); + private final static Tuple BEFORE_FIRST = ResultTuple.EMPTY_TUPLE; private final ResultIterator scanner; private final RowProjector rowProjector; http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java index 908a117..921b412 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java @@ -69,7 +69,7 @@ public class HashCacheFactory implements ServerCacheFactory { } @Override - public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk) throws SQLException { + public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk, boolean useProtoForIndexMaintainer) throws SQLException { try { // This reads the uncompressed length from the front of the compressed input int uncompressedLen = Snappy.getUncompressedLength(cachePtr.get(), cachePtr.getOffset());