http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiEncodedCQKeyValueComparisonFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiEncodedCQKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiEncodedCQKeyValueComparisonFilter.java new file mode 100644 index 0000000..00e662f --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiEncodedCQKeyValueComparisonFilter.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.filter; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.BitSet; +import java.util.NoSuchElementException; +import java.util.TreeSet; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.io.WritableUtils; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.KeyValueColumnExpression; +import org.apache.phoenix.expression.visitor.ExpressionVisitor; +import org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor; +import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; +import org.apache.phoenix.schema.tuple.BaseTuple; + +/** + * Filter used for tables that use number based column qualifiers generated by one of the encoding schemes in + * {@link QualifierEncodingScheme}. Because the qualifiers are number based, instead of using a map of cells to track + * the columns that have been found, we can use an array of cells where the index into the array would be derived by the + * number based column qualifier. See {@link EncodedCQIncrementalResultTuple}. Using this filter helps us to directly + * seek to the next row when the column qualifier that we have encountered is greater than the maxQualifier that we + * expect. This helps in speeding up the queries filtering on key value columns. + */ +public class MultiEncodedCQKeyValueComparisonFilter extends BooleanExpressionFilter { + // Smallest qualifier for the columns that are being projected and filtered on + private int minQualifier; + + // Largest qualifier for the columns that are being projected and filtered on + private int maxQualifier; + + private QualifierEncodingScheme encodingScheme; + + // Smallest qualifier for the columns in where expression + private int whereExpressionMinQualifier; + + // Largest qualifier for the columns in where expression + private int whereExpressionMaxQualifier; + + private FilteredKeyValueHolder filteredKeyValues; + + // BitSet to track the qualifiers in where expression that we expect to find while filtering a row + private BitSet whereExpressionQualifiers; + + // Set to track the column families of the columns in where expression + private TreeSet<byte[]> cfSet; + + // Boolean that tells us whether the result of expression evaluation as and when we filter key values in a row + private Boolean matchedColumn; + + // Tuple used to store the relevant key values found while filtering a row + private EncodedCQIncrementalResultTuple inputTuple = new EncodedCQIncrementalResultTuple(); + + // Member variable to cache the size of whereExpressionQualifiers + private int expectedCardinality; + + private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0]; + + public MultiEncodedCQKeyValueComparisonFilter() {} + + public MultiEncodedCQKeyValueComparisonFilter(Expression expression, QualifierEncodingScheme scheme) { + super(expression); + checkArgument(scheme != NON_ENCODED_QUALIFIERS, "Filter can only be used for encoded qualifiers"); + this.encodingScheme = scheme; + initFilter(expression); + } + + private final class FilteredKeyValueHolder { + // Cell values corresponding to columns in where expression that were found while filtering a row. + private Cell[] filteredCells; + + // BitSet to track whether qualifiers in where expression were found when filtering a row + private BitSet filteredQualifiers; + + // Using an explicit counter instead of relying on the cardinality of the bitset as computing the + // cardinality could be slightly more expensive than just incrementing an integer + private int numKeyValues; + + private FilteredKeyValueHolder(int size) { + filteredCells = new Cell[size]; + filteredQualifiers = new BitSet(size); + } + + private void setCell(int qualifier, Cell c) { + int index = qualifier - whereExpressionMinQualifier; + filteredCells[index] = c; + filteredQualifiers.set(index); + numKeyValues++; + } + + private Cell getCell(int qualifier) { + int index = qualifier - whereExpressionMinQualifier; + return filteredQualifiers.get(index) ? filteredCells[index] : null; + } + + private void clear() { + // Note here that we are only clearing out the filteredQualifiers bitset. We are not setting all the + // entries in filteredKeyValues to null or allocating a new Cell array as that would be expensive. + filteredQualifiers.clear(); + numKeyValues = 0; + } + + /** + * This method really shouldn't be the way for getting hold of cells. It was + * just added to keep the tuple.get(index) method happy. + */ + public Cell getCellAtIndex(int index) { + int bitIndex; + for (bitIndex = filteredQualifiers.nextSetBit(0); bitIndex >= 0 && index >= 0; bitIndex = filteredQualifiers + .nextSetBit(bitIndex + 1)) { + index--; + } + if (bitIndex < 0) { throw new NoSuchElementException(); } + return filteredCells[bitIndex]; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(100); + int length = filteredQualifiers.length(); + for (int i = 0; i < length; i++) { + sb.append(filteredCells[i].toString()); + } + return sb.toString(); + } + + private boolean allColumnsFound() { + return numKeyValues == expectedCardinality; + } + + private int numKeyValues() { + return numKeyValues; + } + + } + + private void initFilter(Expression expression) { + cfSet = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); + final BitSet expressionQualifiers = new BitSet(20); + final Pair<Integer, Integer> range = new Pair<>(); + ExpressionVisitor<Void> visitor = new StatelessTraverseAllExpressionVisitor<Void>() { + @Override + public Void visit(KeyValueColumnExpression expression) { + int qualifier = encodingScheme.decode(expression.getColumnQualifier()); + if (range.getFirst() == null) { + range.setFirst(qualifier); + range.setSecond(qualifier); + } else if (qualifier < range.getFirst()) { + range.setFirst(qualifier); + } else if (qualifier > range.getSecond()) { + range.setSecond(qualifier); + } + cfSet.add(expression.getColumnFamily()); + expressionQualifiers.set(qualifier); + return null; + } + }; + expression.accept(visitor); + // Set min and max qualifiers for columns in the where expression + whereExpressionMinQualifier = range.getFirst(); + whereExpressionMaxQualifier = range.getSecond(); + + int size = whereExpressionMaxQualifier - whereExpressionMinQualifier + 1; + filteredKeyValues = new FilteredKeyValueHolder(size); + + // Initialize the bitset and mark the qualifiers for columns in where expression + whereExpressionQualifiers = new BitSet(size); + for (int i = whereExpressionMinQualifier; i <= whereExpressionMaxQualifier; i++) { + if (expressionQualifiers.get(i)) { + whereExpressionQualifiers.set(i - whereExpressionMinQualifier); + } + } + expectedCardinality = whereExpressionQualifiers.cardinality(); + } + + private boolean isQualifierForColumnInWhereExpression(int qualifier) { + return qualifier >= whereExpressionMinQualifier ? whereExpressionQualifiers.get(qualifier - whereExpressionMinQualifier) : false; + } + + @Override + public ReturnCode filterKeyValue(Cell cell) { + if (Boolean.TRUE.equals(this.matchedColumn)) { + // We already found and matched the single column, all keys now pass + return ReturnCode.INCLUDE_AND_NEXT_COL; + } + if (Boolean.FALSE.equals(this.matchedColumn)) { + // We found all the columns, but did not match the expression, so skip to next row + return ReturnCode.NEXT_ROW; + } + inputTuple.setKey(cell); + int qualifier = encodingScheme.decode(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + if (isQualifierForColumnInWhereExpression(qualifier)) { + filteredKeyValues.setCell(qualifier, cell); + // We found a new column, so we can re-evaluate + this.matchedColumn = this.evaluate(inputTuple); + if (this.matchedColumn == null) { + if (inputTuple.isImmutable()) { + this.matchedColumn = Boolean.FALSE; + } else { + return ReturnCode.INCLUDE_AND_NEXT_COL; + } + } + return this.matchedColumn ? ReturnCode.INCLUDE_AND_NEXT_COL : ReturnCode.NEXT_ROW; + } + // The qualifier is not one of the qualifiers in the expression. So decide whether + // we would need to include it in our result. + if (qualifier < minQualifier) { + // Qualifier is smaller than the minimum expected qualifier. Look at the next column. + return ReturnCode.NEXT_COL; + } + // TODO: I don't think we would ever hit this case of encountering a greater than what we expect. + // Leaving the code commented out here for future reference. + // if (qualifier > maxQualifier) { + // Qualifier is larger than the max expected qualifier. We are done looking at columns in this row. + // return ReturnCode.NEXT_ROW; + // } + return ReturnCode.INCLUDE_AND_NEXT_COL; + } + + @Override + public boolean filterRow() { + if (this.matchedColumn == null && !inputTuple.isImmutable() && expression.requiresFinalEvaluation()) { + inputTuple.setImmutable(); + this.matchedColumn = this.evaluate(inputTuple); + } + return ! (Boolean.TRUE.equals(this.matchedColumn)); + } + + final class EncodedCQIncrementalResultTuple extends BaseTuple { + private final ImmutableBytesWritable keyPtr = new ImmutableBytesWritable(UNITIALIZED_KEY_BUFFER); + private boolean isImmutable; + + @Override + public boolean isImmutable() { + return isImmutable || filteredKeyValues.allColumnsFound(); + } + + public void setImmutable() { + this.isImmutable = true; + } + + private void setKey(Cell value) { + keyPtr.set(value.getRowArray(), value.getRowOffset(), value.getRowLength()); + } + + @Override + public void getKey(ImmutableBytesWritable ptr) { + ptr.set(keyPtr.get(),keyPtr.getOffset(),keyPtr.getLength()); + } + + @Override + public Cell getValue(byte[] cf, byte[] cq) { + int qualifier = encodingScheme.decode(cq); + return filteredKeyValues.getCell(qualifier); + } + + @Override + public String toString() { + return filteredKeyValues.toString(); + } + + @Override + public int size() { + return filteredKeyValues.numKeyValues(); + } + + /** + * This method doesn't perform well and shouldn't be the way of + * getting hold of elements in the tuple. + */ + @Override + public Cell getValue(int index) { + return filteredKeyValues.getCellAtIndex(index); + } + + @Override + public boolean getValue(byte[] family, byte[] qualifier, + ImmutableBytesWritable ptr) { + Cell cell = getValue(family, qualifier); + if (cell == null) + return false; + ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + return true; + } + + void reset() { + isImmutable = false; + keyPtr.set(UNITIALIZED_KEY_BUFFER); + } + } + + @Override + public void readFields(DataInput input) throws IOException { + this.minQualifier = WritableUtils.readVInt(input); + this.maxQualifier = WritableUtils.readVInt(input); + this.whereExpressionMinQualifier = WritableUtils.readVInt(input); + this.whereExpressionMaxQualifier = WritableUtils.readVInt(input); + this.encodingScheme = QualifierEncodingScheme.values()[WritableUtils.readVInt(input)]; + super.readFields(input); + initFilter(expression); + } + + @Override + public void write(DataOutput output) throws IOException { + WritableUtils.writeVInt(output, minQualifier); + WritableUtils.writeVInt(output, maxQualifier); + WritableUtils.writeVInt(output, whereExpressionMinQualifier); + WritableUtils.writeVInt(output, whereExpressionMaxQualifier); + WritableUtils.writeVInt(output, encodingScheme.ordinal()); + super.write(output); + } + + public void setMinMaxQualifierRange(Pair<Integer, Integer> minMaxQualifiers) { + this.minQualifier = minMaxQualifiers.getFirst(); + this.maxQualifier = minMaxQualifiers.getSecond(); + } + + public static MultiEncodedCQKeyValueComparisonFilter parseFrom(final byte [] pbBytes) throws DeserializationException { + try { + return (MultiEncodedCQKeyValueComparisonFilter)Writables.getWritable(pbBytes, new MultiEncodedCQKeyValueComparisonFilter()); + } catch (IOException e) { + throw new DeserializationException(e); + } + } + + @Override + public void reset() { + filteredKeyValues.clear(); + matchedColumn = null; + inputTuple.reset(); + super.reset(); + } + + @Override + public boolean isFamilyEssential(byte[] name) { + // Only the column families involved in the expression are essential. + // The others are for columns projected in the select expression. + return cfSet.contains(name); + } + + +}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java index dba700b..00ecd9f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java @@ -94,7 +94,7 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil refCount = foundColumns.size(); } - public ReturnCode resolveColumn(Cell value) { + private ReturnCode resolveColumn(Cell value) { // Always set key, in case we never find a key value column of interest, // and our expression uses row key columns. setKey(value); @@ -184,7 +184,7 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil ExpressionVisitor<Void> visitor = new StatelessTraverseAllExpressionVisitor<Void>() { @Override public Void visit(KeyValueColumnExpression expression) { - inputTuple.addColumn(expression.getColumnFamily(), expression.getColumnName()); + inputTuple.addColumn(expression.getColumnFamily(), expression.getColumnQualifier()); return null; } }; @@ -231,7 +231,7 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil return ! (Boolean.TRUE.equals(this.matchedColumn)); } - @Override + @Override public void reset() { matchedColumn = null; inputTuple.reset(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java index 0d904bc..195c89c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java @@ -47,7 +47,8 @@ public class SingleCQKeyValueComparisonFilter extends SingleKeyValueComparisonFi public static SingleCQKeyValueComparisonFilter parseFrom(final byte [] pbBytes) throws DeserializationException { try { - return (SingleCQKeyValueComparisonFilter)Writables.getWritable(pbBytes, new SingleCQKeyValueComparisonFilter()); + SingleCQKeyValueComparisonFilter writable = (SingleCQKeyValueComparisonFilter)Writables.getWritable(pbBytes, new SingleCQKeyValueComparisonFilter()); + return writable; } catch (IOException e) { throw new DeserializationException(e); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java index eaf8d35..ae3557d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java @@ -22,11 +22,13 @@ import java.io.IOException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.expression.SingleCellColumnExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor; import org.apache.phoenix.expression.visitor.TraverseAllExpressionVisitor; import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; @@ -58,7 +60,7 @@ public abstract class SingleKeyValueComparisonFilter extends BooleanExpressionFi @Override public Void visit(KeyValueColumnExpression expression) { cf = expression.getColumnFamily(); - cq = expression.getColumnName(); + cq = expression.getColumnQualifier(); return null; } }; http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java index bcadc2b..af847b7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java @@ -35,4 +35,5 @@ public interface ValueGetter { public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException; public byte[] getRowKey(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java index 741bf87..9433abf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java @@ -125,4 +125,5 @@ public abstract class KeyValueBuilder { public abstract KVComparator getKeyValueComparator(); public abstract List<Mutation> cloneIfNecessary(List<Mutation> mutations); + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index 237ed75..6061dd9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -17,7 +17,10 @@ */ package org.apache.phoenix.index; +import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS; + import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; @@ -29,9 +32,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.hadoop.hbase.Cell; @@ -43,17 +48,24 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.compile.ColumnResolver; import org.apache.phoenix.compile.FromCompiler; import org.apache.phoenix.compile.IndexExpressionCompiler; import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.coprocessor.generated.ServerCachingProtos; +import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ColumnInfo; import org.apache.phoenix.expression.CoerceExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.ExpressionType; import org.apache.phoenix.expression.KeyValueColumnExpression; +import org.apache.phoenix.expression.LiteralExpression; +import org.apache.phoenix.expression.SingleCellColumnExpression; +import org.apache.phoenix.expression.SingleCellConstructorExpression; import org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; @@ -67,14 +79,17 @@ import org.apache.phoenix.parse.SQLParser; import org.apache.phoenix.parse.StatelessTraverseAllParseNodeVisitor; import org.apache.phoenix.parse.UDFParseNode; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.AmbiguousColumnException; +import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PDatum; import org.apache.phoenix.schema.PIndexState; -import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.SaltingUtil; @@ -82,10 +97,12 @@ import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.ValueSchema; import org.apache.phoenix.schema.ValueSchema.Field; +import org.apache.phoenix.schema.tuple.BaseTuple; import org.apache.phoenix.schema.tuple.ValueGetterTuple; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.util.BitSet; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.ExpressionUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; @@ -93,6 +110,7 @@ import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TrustedByteArrayOutputStream; import org.apache.tephra.TxConstants; +import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; @@ -105,10 +123,10 @@ import com.google.common.collect.Sets; * row and caches any covered columns. Client-side serializes into byte array using * @link #serialize(PTable, ImmutableBytesWritable)} * and transmits to server-side through either the - * {@link org.apache.phoenix.index.PhoenixIndexCodec#INDEX_MD} + * {@link org.apache.phoenix.index.PhoenixIndexCodec#INDEX_PROTO_MD} * Mutation attribute or as a separate RPC call using * {@link org.apache.phoenix.cache.ServerCacheClient}) - * + * * * @since 2.1.0 */ @@ -116,8 +134,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { private static final int EXPRESSION_NOT_PRESENT = -1; private static final int ESTIMATED_EXPRESSION_SIZE = 8; - - public static IndexMaintainer create(PTable dataTable, PTable index, PhoenixConnection connection) { + + public static IndexMaintainer create(PTable dataTable, PTable index, PhoenixConnection connection) { if (dataTable.getType() == PTableType.INDEX || index.getType() != PTableType.INDEX || !dataTable.getIndexes().contains(index)) { throw new IllegalArgumentException(); } @@ -179,14 +197,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } } int nIndexes = 0; - int estimatedSize = dataTable.getRowKeySchema().getEstimatedByteSize() + 2; while (indexesItr.hasNext()) { nIndexes++; - PTable index = indexesItr.next(); - estimatedSize += index.getIndexMaintainer(dataTable, connection).getEstimatedByteSize(); + indexesItr.next(); } - TrustedByteArrayOutputStream stream = new TrustedByteArrayOutputStream(estimatedSize + 1); - DataOutput output = new DataOutputStream(stream); + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + DataOutputStream output = new DataOutputStream(stream); try { // Encode data table salting in sign of number of indexes WritableUtils.writeVInt(output, nIndexes * (dataTable.getBucketNum() == null ? 1 : -1)); @@ -196,15 +212,23 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { dataTable.isImmutableRows() ? enabledLocalIndexIterator(indexes.iterator()) : nonDisabledIndexIterator(indexes.iterator()); while (indexesItr.hasNext()) { - indexesItr.next().getIndexMaintainer(dataTable, connection).write(output); + org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = IndexMaintainer.toProto(indexesItr.next().getIndexMaintainer(dataTable, connection)); + byte[] protoBytes = proto.toByteArray(); + WritableUtils.writeVInt(output, protoBytes.length); + output.write(protoBytes); } } catch (IOException e) { throw new RuntimeException(e); // Impossible } - ptr.set(stream.getBuffer(), 0, stream.size()); + ptr.set(stream.toByteArray(), 0, stream.size()); } - + /** + * For client-side to append serialized IndexMaintainers of keyValueIndexes + * @param dataTable data table + * @param indexMetaDataPtr bytes pointer to hold returned serialized value + * @param keyValueIndexes indexes to serialize + */ public static void serializeAdditional(PTable table, ImmutableBytesWritable indexMetaDataPtr, List<PTable> keyValueIndexes, PhoenixConnection connection) { int nMutableIndexes = indexMetaDataPtr.getLength() == 0 ? 0 : ByteUtil.vintFromBytes(indexMetaDataPtr); @@ -230,7 +254,10 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } // Serialize mutable indexes afterwards for (PTable index : keyValueIndexes) { - index.getIndexMaintainer(table, connection).write(output); + IndexMaintainer maintainer = index.getIndexMaintainer(table, connection); + byte[] protoBytes = IndexMaintainer.toProto(maintainer).toByteArray(); + WritableUtils.writeVInt(output, protoBytes.length); + output.write(protoBytes); } } catch (IOException e) { throw new RuntimeException(e); // Impossible @@ -239,15 +266,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } public static List<IndexMaintainer> deserialize(ImmutableBytesWritable metaDataPtr, - KeyValueBuilder builder) { - return deserialize(metaDataPtr.get(), metaDataPtr.getOffset(), metaDataPtr.getLength()); + KeyValueBuilder builder, boolean useProtoForIndexMaintainer) { + return deserialize(metaDataPtr.get(), metaDataPtr.getOffset(), metaDataPtr.getLength(), useProtoForIndexMaintainer); } - public static List<IndexMaintainer> deserialize(byte[] buf) { - return deserialize(buf, 0, buf.length); + public static List<IndexMaintainer> deserialize(byte[] buf, boolean useProtoForIndexMaintainer) { + return deserialize(buf, 0, buf.length, useProtoForIndexMaintainer); } - public static List<IndexMaintainer> deserialize(byte[] buf, int offset, int length) { + private static List<IndexMaintainer> deserialize(byte[] buf, int offset, int length, boolean useProtoForIndexMaintainer) { ByteArrayInputStream stream = new ByteArrayInputStream(buf, offset, length); DataInput input = new DataInputStream(stream); List<IndexMaintainer> maintainers = Collections.emptyList(); @@ -259,25 +286,31 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { rowKeySchema.readFields(input); maintainers = Lists.newArrayListWithExpectedSize(size); for (int i = 0; i < size; i++) { - IndexMaintainer maintainer = new IndexMaintainer(rowKeySchema, isDataTableSalted); - maintainer.readFields(input); - maintainers.add(maintainer); + if (useProtoForIndexMaintainer) { + int protoSize = WritableUtils.readVInt(input); + byte[] b = new byte[protoSize]; + input.readFully(b); + org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = ServerCachingProtos.IndexMaintainer.parseFrom(b); + maintainers.add(IndexMaintainer.fromProto(proto, rowKeySchema, isDataTableSalted)); + } else { + IndexMaintainer maintainer = new IndexMaintainer(rowKeySchema, isDataTableSalted); + maintainer.readFields(input); + maintainers.add(maintainer); + } } } catch (IOException e) { throw new RuntimeException(e); // Impossible } return maintainers; } - + private byte[] viewIndexId; private boolean isMultiTenant; // indexed expressions that are not present in the row key of the data table, the expression can also refer to a regular column private List<Expression> indexedExpressions; // columns required to evaluate all expressions in indexedExpressions (this does not include columns in the data row key) private Set<ColumnReference> indexedColumns; - private Set<ColumnReference> coveredColumns; - // Map used to cache column family of data table and the corresponding column family for the local index - private Map<ImmutableBytesPtr, ImmutableBytesWritable> dataTableLocalIndexFamilyMap; + // columns required to create index row i.e. indexedColumns + coveredColumns (this does not include columns in the data row key) private Set<ColumnReference> allColumns; // TODO remove this in the next major release @@ -291,12 +324,10 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { private boolean indexWALDisabled; private boolean isLocalIndex; private boolean immutableRows; - // Transient state private final boolean isDataTableSalted; private final RowKeySchema dataRowKeySchema; - private List<ImmutableBytesPtr> indexQualifiers; private int estimatedIndexRowKeyBytes; private int estimatedExpressionSize; private int[] dataPkPosition; @@ -304,26 +335,48 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { private ColumnReference dataEmptyKeyValueRef; private boolean rowKeyOrderOptimizable; + /**** START: New member variables added in 4.10 *****/ + private QualifierEncodingScheme encodingScheme; + private ImmutableStorageScheme immutableStorageScheme; + /* + * Information for columns of data tables that are being indexed. The first part of the pair is column family name + * and second part is the column name. The reason we need to track this state is because for certain storage schemes + * like ImmutableStorageScheme#SINGLE_CELL_ARRAY_WITH_OFFSETS, the column for which we need to generate an index + * table put/delete is different from the columns that are indexed in the phoenix schema. This information helps us + * determine whether or not certain operations like DROP COLUMN should impact the index. + */ + private Set<Pair<String, String>> indexedColumnsInfo; + /* + * Map of covered columns where a key is column reference for a column in the data table + * and value is column reference for corresponding column in the index table. + */ + private Map<ColumnReference, ColumnReference> coveredColumnsMap; + /**** END: New member variables added in 4.10 *****/ + private IndexMaintainer(RowKeySchema dataRowKeySchema, boolean isDataTableSalted) { this.dataRowKeySchema = dataRowKeySchema; this.isDataTableSalted = isDataTableSalted; } - - private IndexMaintainer(PTable dataTable, PTable index, PhoenixConnection connection) { + + private IndexMaintainer(final PTable dataTable, final PTable index, PhoenixConnection connection) { this(dataTable.getRowKeySchema(), dataTable.getBucketNum() != null); assert(dataTable.getType() == PTableType.SYSTEM || dataTable.getType() == PTableType.TABLE || dataTable.getType() == PTableType.VIEW); this.rowKeyOrderOptimizable = index.rowKeyOrderOptimizable(); this.isMultiTenant = dataTable.isMultiTenant(); this.viewIndexId = index.getViewIndexId() == null ? null : MetaDataUtil.getViewIndexIdDataType().toBytes(index.getViewIndexId()); this.isLocalIndex = index.getIndexType() == IndexType.LOCAL; - + this.encodingScheme = index.getEncodingScheme(); + + // null check for b/w compatibility + this.encodingScheme = index.getEncodingScheme() == null ? QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : index.getEncodingScheme(); + this.immutableStorageScheme = index.getImmutableStorageScheme() == null ? ImmutableStorageScheme.ONE_CELL_PER_COLUMN : index.getImmutableStorageScheme(); + byte[] indexTableName = index.getPhysicalName().getBytes(); // Use this for the nDataSaltBuckets as we need this for local indexes // TODO: persist nDataSaltBuckets separately, but maintain b/w compat. Integer nIndexSaltBuckets = isLocalIndex ? dataTable.getBucketNum() : index.getBucketNum(); boolean indexWALDisabled = index.isWALDisabled(); int indexPosOffset = (index.getBucketNum() == null ? 0 : 1) + (this.isMultiTenant ? 1 : 0) + (this.viewIndexId == null ? 0 : 1); -// int indexPosOffset = !isLocalIndex && nIndexSaltBuckets > 0 ? 1 : 0; int nIndexColumns = index.getColumns().size() - indexPosOffset; int nIndexPKColumns = index.getPKColumns().size() - indexPosOffset; // number of expressions that are indexed that are not present in the row key of the data table @@ -334,7 +387,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { String dataFamilyName = IndexUtil.getDataColumnFamilyName(indexColumnName); String dataColumnName = IndexUtil.getDataColumnName(indexColumnName); try { - PColumn dataColumn = dataFamilyName.equals("") ? dataTable.getColumn(dataColumnName) : dataTable.getColumnFamily(dataFamilyName).getColumn(dataColumnName); + PColumn dataColumn = dataFamilyName.equals("") ? dataTable.getColumnForColumnName(dataColumnName) : dataTable.getColumnFamily(dataFamilyName).getPColumnForColumnName(dataColumnName); if (SchemaUtil.isPKColumn(dataColumn)) continue; } catch (ColumnNotFoundException e) { @@ -366,8 +419,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { this.indexTableName = indexTableName; this.indexedColumnTypes = Lists.<PDataType>newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns); this.indexedExpressions = Lists.newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns); - this.coveredColumns = Sets.newLinkedHashSetWithExpectedSize(nIndexColumns-nIndexPKColumns); - this.dataTableLocalIndexFamilyMap = Maps.newHashMapWithExpectedSize(nIndexColumns-nIndexPKColumns); + this.coveredColumnsMap = Maps.newHashMapWithExpectedSize(nIndexColumns - nIndexPKColumns); this.nIndexSaltBuckets = nIndexSaltBuckets == null ? 0 : nIndexSaltBuckets; this.dataEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(dataTable); this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(index); @@ -397,6 +449,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { throw new RuntimeException(e); // Impossible } StatementContext context = new StatementContext(new PhoenixStatement(connection), resolver); + this.indexedColumnsInfo = Sets.newHashSetWithExpectedSize(nIndexColumns - nIndexPKColumns); + IndexExpressionCompiler expressionIndexCompiler = new IndexExpressionCompiler(context); for (int i = indexPosOffset; i < index.getPKColumns().size(); i++) { PColumn indexColumn = index.getPKColumns().get(i); @@ -409,12 +463,13 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { throw new RuntimeException(e); // Impossible } if ( expressionIndexCompiler.getColumnRef()!=null ) { - // get the column of the data table that corresponds to this index column + // get the column of the data column that corresponds to this index column PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString()); boolean isPKColumn = SchemaUtil.isPKColumn(column); if (isPKColumn) { int dataPkPos = dataTable.getPKColumns().indexOf(column) - (dataTable.getBucketNum() == null ? 0 : 1) - (this.isMultiTenant ? 1 : 0); this.rowKeyMetaData.setIndexPkPosition(dataPkPos, indexPos); + indexedColumnsInfo.add(new Pair<>((String)null, column.getName().getString())); } else { indexColByteSize += column.getDataType().isFixedWidth() ? SchemaUtil.getFixedByteSize(column) : ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE; try { @@ -424,6 +479,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { expression = CoerceExpression.create(expression, indexColumn.getDataType()); } this.indexedExpressions.add(expression); + indexedColumnsInfo.add(new Pair<>(column.getFamilyName().getString(), column.getName().getString())); } catch (SQLException e) { throw new RuntimeException(e); // Impossible } @@ -432,6 +488,45 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { else { indexColByteSize += expression.getDataType().isFixedWidth() ? SchemaUtil.getFixedByteSize(expression) : ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE; this.indexedExpressions.add(expression); + KeyValueExpressionVisitor kvVisitor = new KeyValueExpressionVisitor() { + @Override + public Void visit(KeyValueColumnExpression colExpression) { + return addDataColInfo(dataTable, colExpression); + } + + @Override + public Void visit(SingleCellColumnExpression expression) { + return addDataColInfo(dataTable, expression); + } + + private Void addDataColInfo(final PTable dataTable, Expression expression) { + Preconditions.checkArgument(expression instanceof SingleCellColumnExpression + || expression instanceof KeyValueColumnExpression); + + KeyValueColumnExpression colExpression = null; + if (expression instanceof SingleCellColumnExpression) { + colExpression = + ((SingleCellColumnExpression) expression).getKeyValueExpression(); + } else { + colExpression = ((KeyValueColumnExpression) expression); + } + byte[] cf = colExpression.getColumnFamily(); + byte[] cq = colExpression.getColumnQualifier(); + try { + PColumn dataColumn = + cf == null ? dataTable.getColumnForColumnQualifier(null, cq) + : dataTable.getColumnFamily(cf) + .getPColumnForColumnQualifier(cq); + indexedColumnsInfo.add(new Pair<>(dataColumn.getFamilyName() + .getString(), dataColumn.getName().getString())); + } catch (ColumnNotFoundException | ColumnFamilyNotFoundException + | AmbiguousColumnException e) { + throw new RuntimeException(e); + } + return null; + } + }; + expression.accept(kvVisitor); } // set the sort order of the expression correctly if (indexColumn.getSortOrder() == SortOrder.DESC) { @@ -442,18 +537,17 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { for (int i = 0; i < index.getColumnFamilies().size(); i++) { PColumnFamily family = index.getColumnFamilies().get(i); for (PColumn indexColumn : family.getColumns()) { - PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString()); - PName dataTableFamily = column.getFamilyName(); - this.coveredColumns.add(new ColumnReference(dataTableFamily.getBytes(), column.getName().getBytes())); - if(isLocalIndex) { - this.dataTableLocalIndexFamilyMap.put(new ImmutableBytesPtr(dataTableFamily.getBytes()), new ImmutableBytesWritable(Bytes.toBytes(IndexUtil.getLocalIndexColumnFamily(dataTableFamily.getString())))); - } + PColumn dataColumn = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString()); + byte[] dataColumnCq = dataColumn.getColumnQualifierBytes(); + byte[] indexColumnCq = indexColumn.getColumnQualifierBytes(); + this.coveredColumnsMap.put(new ColumnReference(dataColumn.getFamilyName().getBytes(), dataColumnCq), + new ColumnReference(indexColumn.getFamilyName().getBytes(), indexColumnCq)); } } this.estimatedIndexRowKeyBytes = estimateIndexRowKeyByteSize(indexColByteSize); initCachedState(); } - + public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey) { ImmutableBytesWritable ptr = new ImmutableBytesWritable(); boolean prependRegionStartKey = isLocalIndex && regionStartKey != null; @@ -854,37 +948,106 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } return indexRowKeySchema; } - + public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter valueGetter, ImmutableBytesWritable dataRowKeyPtr, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException { + byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey); Put put = null; // New row being inserted: add the empty key value if (valueGetter.getLatestValue(dataEmptyKeyValueRef) == null) { - byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey); put = new Put(indexRowKey); // add the keyvalue for the empty row put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey), - this.getEmptyKeyValueFamily(), QueryConstants.EMPTY_COLUMN_BYTES_PTR, ts, + this.getEmptyKeyValueFamily(), dataEmptyKeyValueRef.getQualifierWritable(), ts, // set the value to the empty column name - QueryConstants.EMPTY_COLUMN_BYTES_PTR)); + dataEmptyKeyValueRef.getQualifierWritable())); put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } - int i = 0; - for (ColumnReference ref : this.getCoveredColumns()) { - ImmutableBytesPtr cq = this.indexQualifiers.get(i++); - ImmutableBytesWritable value = valueGetter.getLatestValue(ref); - byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey); - ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey); - if (value != null) { + ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey); + if (immutableStorageScheme != ImmutableStorageScheme.ONE_CELL_PER_COLUMN) { + // map from index column family to list of pair of index column and data column (for covered columns) + Map<ImmutableBytesPtr, List<Pair<ColumnReference, ColumnReference>>> familyToColListMap = Maps.newHashMap(); + for (ColumnReference ref : this.getCoveredColumns()) { + ColumnReference indexColRef = this.coveredColumnsMap.get(ref); + ImmutableBytesPtr cf = new ImmutableBytesPtr(indexColRef.getFamily()); + if (!familyToColListMap.containsKey(cf)) { + familyToColListMap.put(cf, Lists.<Pair<ColumnReference, ColumnReference>>newArrayList()); + } + familyToColListMap.get(cf).add(Pair.newPair(indexColRef, ref)); + } + // iterate over each column family and create a byte[] containing all the columns + for (Entry<ImmutableBytesPtr, List<Pair<ColumnReference, ColumnReference>>> entry : familyToColListMap.entrySet()) { + byte[] columnFamily = entry.getKey().copyBytesIfNecessary(); + List<Pair<ColumnReference, ColumnReference>> colRefPairs = entry.getValue(); + int maxEncodedColumnQualifier = Integer.MIN_VALUE; + // find the max col qualifier + for (Pair<ColumnReference, ColumnReference> colRefPair : colRefPairs) { + maxEncodedColumnQualifier = Math.max(maxEncodedColumnQualifier, encodingScheme.decode(colRefPair.getFirst().getQualifier())); + } + Expression[] colValues = EncodedColumnsUtil.createColumnExpressionArray(maxEncodedColumnQualifier); + // set the values of the columns + for (Pair<ColumnReference, ColumnReference> colRefPair : colRefPairs) { + ColumnReference indexColRef = colRefPair.getFirst(); + ColumnReference dataColRef = colRefPair.getSecond(); + Expression expression = new SingleCellColumnExpression(new PDatum() { + @Override + public boolean isNullable() { + return false; + } + + @Override + public SortOrder getSortOrder() { + return null; + } + + @Override + public Integer getScale() { + return null; + } + + @Override + public Integer getMaxLength() { + return null; + } + + @Override + public PDataType getDataType() { + return null; + } + }, dataColRef.getFamily(), dataColRef.getQualifier(), encodingScheme); + ImmutableBytesPtr ptr = new ImmutableBytesPtr(); + expression.evaluate(new ValueGetterTuple(valueGetter), ptr); + byte[] value = ptr.copyBytesIfNecessary(); + if (value != null) { + int indexArrayPos = encodingScheme.decode(indexColRef.getQualifier())-QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE+1; + colValues[indexArrayPos] = new LiteralExpression(value); + } + } + + List<Expression> children = Arrays.asList(colValues); + // we use SingleCellConstructorExpression to serialize multiple columns into a single byte[] + SingleCellConstructorExpression singleCellConstructorExpression = new SingleCellConstructorExpression(immutableStorageScheme, children); + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + singleCellConstructorExpression.evaluate(new BaseTuple() {}, ptr); if (put == null) { put = new Put(indexRowKey); put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } + ImmutableBytesPtr colFamilyPtr = new ImmutableBytesPtr(columnFamily); //this is a little bit of extra work for installations that are running <0.94.14, but that should be rare and is a short-term set of wrappers - it shouldn't kill GC - if(this.isLocalIndex) { - ImmutableBytesWritable localIndexColFamily = this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable()); - put.add(kvBuilder.buildPut(rowKey, localIndexColFamily, cq, ts, value)); - } else { - put.add(kvBuilder.buildPut(rowKey, ref.getFamilyWritable(), cq, ts, value)); + put.add(kvBuilder.buildPut(rowKey, colFamilyPtr, QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES_PTR, ts, ptr)); + } + } else { + for (ColumnReference ref : this.getCoveredColumns()) { + ColumnReference indexColRef = this.coveredColumnsMap.get(ref); + ImmutableBytesPtr cq = indexColRef.getQualifierWritable(); + ImmutableBytesPtr cf = indexColRef.getFamilyWritable(); + ImmutableBytesWritable value = valueGetter.getLatestValue(ref); + if (value != null) { + if (put == null) { + put = new Put(indexRowKey); + put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); + } + put.add(kvBuilder.buildPut(rowKey, cf, cq, ts, value)); } } } @@ -962,7 +1125,6 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { return buildDeleteMutation(kvBuilder, null, dataRowKeyPtr, Collections.<KeyValue>emptyList(), ts, null, null); } - @SuppressWarnings("deprecation") public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<KeyValue> pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException { byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, regionStartKey, regionEndKey); // Delete the entire row if any of the indexed columns changed @@ -972,15 +1134,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { Delete delete = new Delete(indexRowKey); for (ColumnReference ref : getCoveredColumns()) { - byte[] family = ref.getFamily(); - if (this.isLocalIndex) { - family = this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable()).get(); - } + ColumnReference indexColumn = coveredColumnsMap.get(ref); // If table delete was single version, then index delete should be as well if (deleteType == DeleteType.SINGLE_VERSION) { - delete.deleteFamilyVersion(family, ts); + delete.deleteFamilyVersion(indexColumn.getFamily(), ts); } else { - delete.deleteFamily(family, ts); + delete.deleteFamily(indexColumn.getFamily(), ts); } } if (deleteType == DeleteType.SINGLE_VERSION) { @@ -992,34 +1151,35 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { return delete; } Delete delete = null; + Set<ColumnReference> dataTableColRefs = coveredColumnsMap.keySet(); // Delete columns for missing key values for (Cell kv : pendingUpdates) { if (kv.getTypeByte() != KeyValue.Type.Put.getCode()) { ColumnReference ref = new ColumnReference(kv.getFamily(), kv.getQualifier()); - if (coveredColumns.contains(ref)) { + if (dataTableColRefs.contains(ref)) { if (delete == null) { delete = new Delete(indexRowKey); delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } - byte[] family = this.isLocalIndex ? this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable()).get() : ref.getFamily(); + ColumnReference indexColumn = coveredColumnsMap.get(ref); // If point delete for data table, then use point delete for index as well - if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) { - delete.deleteColumn(family, IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts); + if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) { + delete.deleteColumn(indexColumn.getFamily(), indexColumn.getQualifier(), ts); } else { - delete.deleteColumns(family, IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts); + delete.deleteColumns(indexColumn.getFamily(), indexColumn.getQualifier(), ts); } } } } return delete; } - + public byte[] getIndexTableName() { return indexTableName; } public Set<ColumnReference> getCoveredColumns() { - return coveredColumns; + return coveredColumnsMap.keySet(); } public Set<ColumnReference> getAllColumns() { @@ -1032,7 +1192,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // If if there are no covered columns, we know it's our default name return emptyKeyValueCFPtr; } - + + @Deprecated // Only called by code older than our 4.10 release @Override public void readFields(DataInput input) throws IOException { int encodedIndexSaltBucketsAndMultiTenant = WritableUtils.readVInt(input); @@ -1060,16 +1221,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { int encodedCoveredolumnsAndLocalIndex = WritableUtils.readVInt(input); isLocalIndex = encodedCoveredolumnsAndLocalIndex < 0; int nCoveredColumns = Math.abs(encodedCoveredolumnsAndLocalIndex) - 1; - coveredColumns = Sets.newLinkedHashSetWithExpectedSize(nCoveredColumns); - dataTableLocalIndexFamilyMap = Maps.newHashMapWithExpectedSize(nCoveredColumns); + coveredColumnsMap = Maps.newHashMapWithExpectedSize(nCoveredColumns); for (int i = 0; i < nCoveredColumns; i++) { - byte[] cf = Bytes.readByteArray(input); - byte[] cq = Bytes.readByteArray(input); - ColumnReference ref = new ColumnReference(cf,cq); - coveredColumns.add(ref); - if(isLocalIndex) { - dataTableLocalIndexFamilyMap.put(ref.getFamilyWritable(), new ImmutableBytesWritable(Bytes.toBytes(IndexUtil.getLocalIndexColumnFamily(Bytes.toString(cf))))); - } + byte[] dataTableCf = Bytes.readByteArray(input); + byte[] dataTableCq = Bytes.readByteArray(input); + ColumnReference dataTableRef = new ColumnReference(dataTableCf, dataTableCq); + byte[] indexTableCf = isLocalIndex ? IndexUtil.getLocalIndexColumnFamily(dataTableCf) : dataTableCf; + byte[] indexTableCq = IndexUtil.getIndexColumnName(dataTableCf, dataTableCq); + ColumnReference indexTableRef = new ColumnReference(indexTableCf, indexTableCq); + coveredColumnsMap.put(dataTableRef, indexTableRef); } // Hack to serialize whether the index row key is optimizable int len = WritableUtils.readVInt(input); @@ -1097,9 +1257,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { int numIndexedExpressions = WritableUtils.readVInt(input); indexedExpressions = Lists.newArrayListWithExpectedSize(numIndexedExpressions); for (int i = 0; i < numIndexedExpressions; i++) { - Expression expression = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance(); - expression.readFields(input); - indexedExpressions.add(expression); + Expression expression = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance(); + expression.readFields(input); + indexedExpressions.add(expression); } } else { @@ -1151,6 +1311,79 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { initCachedState(); } + + public static IndexMaintainer fromProto(ServerCachingProtos.IndexMaintainer proto, RowKeySchema dataTableRowKeySchema, boolean isDataTableSalted) throws IOException { + IndexMaintainer maintainer = new IndexMaintainer(dataTableRowKeySchema, isDataTableSalted); + maintainer.nIndexSaltBuckets = proto.getSaltBuckets(); + maintainer.isMultiTenant = proto.getIsMultiTenant(); + maintainer.viewIndexId = proto.hasViewIndexId() ? proto.getViewIndexId().toByteArray() : null; + List<ServerCachingProtos.ColumnReference> indexedColumnsList = proto.getIndexedColumnsList(); + maintainer.indexedColumns = new HashSet<ColumnReference>(indexedColumnsList.size()); + for (ServerCachingProtos.ColumnReference colRefFromProto : indexedColumnsList) { + maintainer.indexedColumns.add(new ColumnReference(colRefFromProto.getFamily().toByteArray(), colRefFromProto.getQualifier().toByteArray())); + } + List<Integer> indexedColumnTypes = proto.getIndexedColumnTypeOrdinalList(); + maintainer.indexedColumnTypes = new ArrayList<PDataType>(indexedColumnTypes.size()); + for (Integer typeOrdinal : indexedColumnTypes) { + maintainer.indexedColumnTypes.add(PDataType.values()[typeOrdinal]); + } + maintainer.indexTableName = proto.getIndexTableName().toByteArray(); + maintainer.rowKeyOrderOptimizable = proto.getRowKeyOrderOptimizable(); + maintainer.dataEmptyKeyValueCF = proto.getDataTableEmptyKeyValueColFamily().toByteArray(); + ServerCachingProtos.ImmutableBytesWritable emptyKeyValueColFamily = proto.getEmptyKeyValueColFamily(); + maintainer.emptyKeyValueCFPtr = new ImmutableBytesPtr(emptyKeyValueColFamily.getByteArray().toByteArray(), emptyKeyValueColFamily.getOffset(), emptyKeyValueColFamily.getLength()); + maintainer.indexedExpressions = new ArrayList<>(); + try (ByteArrayInputStream stream = new ByteArrayInputStream(proto.getIndexedExpressions().toByteArray())) { + DataInput input = new DataInputStream(stream); + while (stream.available() > 0) { + int expressionOrdinal = WritableUtils.readVInt(input); + Expression expression = ExpressionType.values()[expressionOrdinal].newInstance(); + expression.readFields(input); + maintainer.indexedExpressions.add(expression); + } + } + maintainer.rowKeyMetaData = newRowKeyMetaData(maintainer, dataTableRowKeySchema, maintainer.indexedExpressions.size(), isDataTableSalted, maintainer.isMultiTenant); + try (ByteArrayInputStream stream = new ByteArrayInputStream(proto.getRowKeyMetadata().toByteArray())) { + DataInput input = new DataInputStream(stream); + maintainer.rowKeyMetaData.readFields(input); + } + maintainer.nDataCFs = proto.getNumDataTableColFamilies(); + maintainer.indexWALDisabled = proto.getIndexWalDisabled(); + maintainer.estimatedIndexRowKeyBytes = proto.getIndexRowKeyByteSize(); + maintainer.immutableRows = proto.getImmutable(); + List<ColumnInfo> indexedColumnInfoList = proto.getIndexedColumnInfoList(); + maintainer.indexedColumnsInfo = Sets.newHashSet(); + for (ColumnInfo info : indexedColumnInfoList) { + maintainer.indexedColumnsInfo.add(new Pair<>(info.getFamilyName(), info.getColumnName())); + } + // proto doesn't support single byte so need an explicit cast here + maintainer.encodingScheme = PTable.QualifierEncodingScheme.fromSerializedValue((byte)proto.getEncodingScheme()); + maintainer.immutableStorageScheme = PTable.ImmutableStorageScheme.fromSerializedValue((byte)proto.getImmutableStorageScheme()); + maintainer.isLocalIndex = proto.getIsLocalIndex(); + + List<ServerCachingProtos.ColumnReference> dataTableColRefsForCoveredColumnsList = proto.getDataTableColRefForCoveredColumnsList(); + List<ServerCachingProtos.ColumnReference> indexTableColRefsForCoveredColumnsList = proto.getIndexTableColRefForCoveredColumnsList(); + maintainer.coveredColumnsMap = Maps.newHashMapWithExpectedSize(dataTableColRefsForCoveredColumnsList.size()); + boolean encodedColumnNames = maintainer.encodingScheme != NON_ENCODED_QUALIFIERS; + Iterator<ServerCachingProtos.ColumnReference> indexTableColRefItr = indexTableColRefsForCoveredColumnsList.iterator(); + for (ServerCachingProtos.ColumnReference colRefFromProto : dataTableColRefsForCoveredColumnsList) { + ColumnReference dataTableColRef = new ColumnReference(colRefFromProto.getFamily().toByteArray(), colRefFromProto.getQualifier( ).toByteArray()); + ColumnReference indexTableColRef; + if (encodedColumnNames) { + ServerCachingProtos.ColumnReference fromProto = indexTableColRefItr.next(); + indexTableColRef = new ColumnReference(fromProto.getFamily().toByteArray(), fromProto.getQualifier( ).toByteArray()); + } else { + byte[] cq = IndexUtil.getIndexColumnName(dataTableColRef.getFamily(), dataTableColRef.getQualifier()); + byte[] cf = maintainer.isLocalIndex ? IndexUtil.getLocalIndexColumnFamily(dataTableColRef.getFamily()) : dataTableColRef.getFamily(); + indexTableColRef = new ColumnReference(cf, cq); + } + maintainer.coveredColumnsMap.put(dataTableColRef, indexTableColRef); + } + maintainer.initCachedState(); + return maintainer; + } + + @Deprecated // Only called by code older than our 4.10 release @Override public void write(DataOutput output) throws IOException { // Encode nIndexSaltBuckets and isMultiTenant together @@ -1170,8 +1403,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { WritableUtils.writeVInt(output, type.ordinal()); } // Encode coveredColumns.size() and whether or not this is a local index - WritableUtils.writeVInt(output, (coveredColumns.size() + 1) * (isLocalIndex ? -1 : 1)); - for (ColumnReference ref : coveredColumns) { + WritableUtils.writeVInt(output, (coveredColumnsMap.size() + 1) * (isLocalIndex ? -1 : 1)); + for (ColumnReference ref : coveredColumnsMap.keySet()) { Bytes.writeByteArray(output, ref.getFamily()); Bytes.writeByteArray(output, ref.getQualifier()); } @@ -1186,8 +1419,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { WritableUtils.writeVInt(output, indexedExpressions.size()); for (Expression expression : indexedExpressions) { - WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal()); - expression.write(output); + WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal()); + expression.write(output); } rowKeyMetaData.write(output); @@ -1196,6 +1429,76 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // Encode estimatedIndexRowKeyBytes and immutableRows together. WritableUtils.writeVInt(output, estimatedIndexRowKeyBytes * (immutableRows ? -1 : 1)); } + + public static ServerCachingProtos.IndexMaintainer toProto(IndexMaintainer maintainer) throws IOException { + ServerCachingProtos.IndexMaintainer.Builder builder = ServerCachingProtos.IndexMaintainer.newBuilder(); + builder.setSaltBuckets(maintainer.nIndexSaltBuckets); + builder.setIsMultiTenant(maintainer.isMultiTenant); + if (maintainer.viewIndexId != null) { + builder.setViewIndexId(ByteStringer.wrap(maintainer.viewIndexId)); + } + for (ColumnReference colRef : maintainer.indexedColumns) { + ServerCachingProtos.ColumnReference.Builder cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder(); + cRefBuilder.setFamily(ByteStringer.wrap(colRef.getFamily())); + cRefBuilder.setQualifier(ByteStringer.wrap(colRef.getQualifier())); + builder.addIndexedColumns(cRefBuilder.build()); + } + for (PDataType dataType : maintainer.indexedColumnTypes) { + builder.addIndexedColumnTypeOrdinal(dataType.ordinal()); + } + for (Entry<ColumnReference, ColumnReference> e : maintainer.coveredColumnsMap.entrySet()) { + ServerCachingProtos.ColumnReference.Builder cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder(); + ColumnReference dataTableColRef = e.getKey(); + cRefBuilder.setFamily(ByteStringer.wrap(dataTableColRef.getFamily())); + cRefBuilder.setQualifier(ByteStringer.wrap(dataTableColRef.getQualifier())); + builder.addDataTableColRefForCoveredColumns(cRefBuilder.build()); + if (maintainer.encodingScheme != NON_ENCODED_QUALIFIERS) { + // We need to serialize the colRefs of index tables only in case of encoded column names. + ColumnReference indexTableColRef = e.getValue(); + cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder(); + cRefBuilder.setFamily(ByteStringer.wrap(indexTableColRef.getFamily())); + cRefBuilder.setQualifier(ByteStringer.wrap(indexTableColRef.getQualifier())); + builder.addIndexTableColRefForCoveredColumns(cRefBuilder.build()); + } + } + builder.setIsLocalIndex(maintainer.isLocalIndex); + builder.setIndexTableName(ByteStringer.wrap(maintainer.indexTableName)); + builder.setRowKeyOrderOptimizable(maintainer.rowKeyOrderOptimizable); + builder.setDataTableEmptyKeyValueColFamily(ByteStringer.wrap(maintainer.dataEmptyKeyValueCF)); + ServerCachingProtos.ImmutableBytesWritable.Builder ibwBuilder = ServerCachingProtos.ImmutableBytesWritable.newBuilder(); + ibwBuilder.setByteArray(ByteStringer.wrap(maintainer.emptyKeyValueCFPtr.get())); + ibwBuilder.setLength(maintainer.emptyKeyValueCFPtr.getLength()); + ibwBuilder.setOffset(maintainer.emptyKeyValueCFPtr.getOffset()); + builder.setEmptyKeyValueColFamily(ibwBuilder.build()); + try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { + DataOutput output = new DataOutputStream(stream); + for (Expression expression : maintainer.indexedExpressions) { + WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal()); + expression.write(output); + } + builder.setIndexedExpressions(ByteStringer.wrap(stream.toByteArray())); + } + try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { + DataOutput output = new DataOutputStream(stream); + maintainer.rowKeyMetaData.write(output); + builder.setRowKeyMetadata(ByteStringer.wrap(stream.toByteArray())); + } + builder.setNumDataTableColFamilies(maintainer.nDataCFs); + builder.setIndexWalDisabled(maintainer.indexWALDisabled); + builder.setIndexRowKeyByteSize(maintainer.estimatedIndexRowKeyBytes); + builder.setImmutable(maintainer.immutableRows); + for (Pair<String, String> p : maintainer.indexedColumnsInfo) { + ServerCachingProtos.ColumnInfo.Builder ciBuilder = ServerCachingProtos.ColumnInfo.newBuilder(); + if (p.getFirst() != null) { + ciBuilder.setFamilyName(p.getFirst()); + } + ciBuilder.setColumnName(p.getSecond()); + builder.addIndexedColumnInfo(ciBuilder.build()); + } + builder.setEncodingScheme(maintainer.encodingScheme.getSerializedMetadataValue()); + builder.setImmutableStorageScheme(maintainer.immutableStorageScheme.getSerializedMetadataValue()); + return builder.build(); + } public int getEstimatedByteSize() { int size = WritableUtils.getVIntSize(nIndexSaltBuckets); @@ -1212,8 +1515,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { PDataType type = indexedColumnTypes.get(i); size += WritableUtils.getVIntSize(type.ordinal()); } - size += WritableUtils.getVIntSize(coveredColumns.size()); - for (ColumnReference ref : coveredColumns) { + Set<ColumnReference> dataTableColRefs = coveredColumnsMap.keySet(); + size += WritableUtils.getVIntSize(dataTableColRefs.size()); + for (ColumnReference ref : dataTableColRefs) { size += WritableUtils.getVIntSize(ref.getFamilyWritable().getSize()); size += ref.getFamily().length; size += WritableUtils.getVIntSize(ref.getQualifierWritable().getSize()); @@ -1241,24 +1545,16 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { * Init calculated state reading/creating */ private void initCachedState() { - dataEmptyKeyValueRef = - new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(), - QueryConstants.EMPTY_COLUMN_BYTES); - - indexQualifiers = Lists.newArrayListWithExpectedSize(this.coveredColumns.size()); - for (ColumnReference ref : coveredColumns) { - indexQualifiers.add(new ImmutableBytesPtr(IndexUtil.getIndexColumnName( - ref.getFamily(), ref.getQualifier()))); - } - - this.allColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + coveredColumns.size()); + byte[] emptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(encodingScheme).getFirst(); + dataEmptyKeyValueRef = new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(), emptyKvQualifier); + this.allColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + coveredColumnsMap.size()); // columns that are required to evaluate all expressions in indexedExpressions (not including columns in data row key) this.indexedColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size()); for (Expression expression : indexedExpressions) { KeyValueExpressionVisitor visitor = new KeyValueExpressionVisitor() { @Override public Void visit(KeyValueColumnExpression expression) { - if (indexedColumns.add(new ColumnReference(expression.getColumnFamily(), expression.getColumnName()))) { + if (indexedColumns.add(new ColumnReference(expression.getColumnFamily(), expression.getColumnQualifier()))) { indexedColumnTypes.add(expression.getDataType()); } return null; @@ -1267,7 +1563,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { expression.accept(visitor); } allColumns.addAll(indexedColumns); - allColumns.addAll(coveredColumns); + allColumns.addAll(coveredColumnsMap.keySet()); int dataPkOffset = (isDataTableSalted ? 1 : 0) + (isMultiTenant ? 1 : 0); int nIndexPkColumns = getIndexPkColumnCount(); @@ -1311,12 +1607,21 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } private int getIndexPkColumnCount() { - return dataRowKeySchema.getFieldCount() + indexedExpressions.size() - (isDataTableSalted ? 1 : 0) - (isMultiTenant ? 1 : 0); + return getIndexPkColumnCount(dataRowKeySchema, indexedExpressions.size(), isDataTableSalted, isMultiTenant); + } + + private static int getIndexPkColumnCount(RowKeySchema rowKeySchema, int numIndexExpressions, boolean isDataTableSalted, boolean isMultiTenant) { + return rowKeySchema.getFieldCount() + numIndexExpressions - (isDataTableSalted ? 1 : 0) - (isMultiTenant ? 1 : 0); } private RowKeyMetaData newRowKeyMetaData() { return getIndexPkColumnCount() < 0xFF ? new ByteSizeRowKeyMetaData() : new IntSizedRowKeyMetaData(); } + + private static RowKeyMetaData newRowKeyMetaData(IndexMaintainer i, RowKeySchema rowKeySchema, int numIndexExpressions, boolean isDataTableSalted, boolean isMultiTenant) { + int indexPkColumnCount = getIndexPkColumnCount(rowKeySchema, numIndexExpressions, isDataTableSalted, isMultiTenant); + return indexPkColumnCount < 0xFF ? i.new ByteSizeRowKeyMetaData() : i.new IntSizedRowKeyMetaData(); + } private RowKeyMetaData newRowKeyMetaData(int capacity) { return capacity < 0xFF ? new ByteSizeRowKeyMetaData(capacity) : new IntSizedRowKeyMetaData(capacity); @@ -1523,4 +1828,17 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { return udfParseNodes; } } + + public byte[] getEmptyKeyValueQualifier() { + return dataEmptyKeyValueRef.getQualifier(); + } + + public Set<Pair<String, String>> getIndexedColumnInfo() { + return indexedColumnsInfo; + } + + public ImmutableStorageScheme getIndexStorageScheme() { + return immutableStorageScheme; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java index 05a01b9..fcabdfd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java @@ -93,4 +93,5 @@ public class IndexMetaDataCacheClient { */ return serverCache.addServerCache(ranges, ptr, txState, new IndexMetaDataCacheFactory(), cacheUsingTableRef); } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java index 56849fe..9edcafc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java @@ -47,10 +47,10 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory { } @Override - public Closeable newCache (ImmutableBytesWritable cachePtr, byte[] txState, final MemoryChunk chunk) throws SQLException { + public Closeable newCache (ImmutableBytesWritable cachePtr, byte[] txState, final MemoryChunk chunk, boolean useProtoForIndexMaintainer) throws SQLException { // just use the standard keyvalue builder - this doesn't really need to be fast final List<IndexMaintainer> maintainers = - IndexMaintainer.deserialize(cachePtr, GenericKeyValueBuilder.INSTANCE); + IndexMaintainer.deserialize(cachePtr, GenericKeyValueBuilder.INSTANCE, useProtoForIndexMaintainer); final Transaction txn; try { txn = txState.length!=0 ? MutationState.decodeTransaction(txState) : null; http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java index ae0a19f..5011245 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java @@ -167,7 +167,7 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder { ExpressionVisitor<Void> visitor = new StatelessTraverseAllExpressionVisitor<Void>() { @Override public Void visit(KeyValueColumnExpression expression) { - get.addColumn(expression.getColumnFamily(), expression.getColumnName()); + get.addColumn(expression.getColumnFamily(), expression.getColumnQualifier()); estimatedSizeHolder[0]++; return null; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java index 9d2955b..4116101 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java @@ -38,6 +38,7 @@ import com.google.common.collect.Lists; */ public class PhoenixIndexCodec extends BaseIndexCodec { public static final String INDEX_MD = "IdxMD"; + public static final String INDEX_PROTO_MD = "IdxProtoMD"; public static final String INDEX_UUID = "IdxUUID"; public static final String INDEX_MAINTAINERS = "IndexMaintainers"; private static KeyValueBuilder KV_BUILDER = GenericKeyValueBuilder.INSTANCE; http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java index e515dbb..5da8be8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java @@ -226,4 +226,4 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { } return indexTableNames; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java index d22e957..39473dc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java @@ -47,10 +47,15 @@ public class PhoenixIndexMetaData implements IndexMetaData { if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; } byte[] uuid = attributes.get(PhoenixIndexCodec.INDEX_UUID); if (uuid == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; } - byte[] md = attributes.get(PhoenixIndexCodec.INDEX_MD); + boolean useProto = false; + byte[] md = attributes.get(PhoenixIndexCodec.INDEX_PROTO_MD); + useProto = md != null; + if (md == null) { + md = attributes.get(PhoenixIndexCodec.INDEX_MD); + } byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE); if (md != null) { - final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md); + final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md, useProto); final Transaction txn = MutationState.decodeTransaction(txState); return new IndexMetaDataCache() {