http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java index 00ece40..15a9f74 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java @@ -26,6 +26,7 @@ import org.apache.phoenix.expression.ArrayConstructorExpression; import org.apache.phoenix.expression.CaseExpression; import org.apache.phoenix.expression.CoerceExpression; import org.apache.phoenix.expression.ComparisonExpression; +import org.apache.phoenix.expression.ArrayColumnExpression; import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression; import org.apache.phoenix.expression.DivideExpression; import org.apache.phoenix.expression.Expression; @@ -80,6 +81,11 @@ public abstract class CloneExpressionVisitor extends TraverseAllExpressionVisito public Expression visit(KeyValueColumnExpression node) { return node; } + + @Override + public Expression visit(ArrayColumnExpression node) { + return node; + } @Override public Expression visit(ProjectedColumnExpression node) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java index 31f340d..100f099 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java @@ -27,6 +27,7 @@ import org.apache.phoenix.expression.ArrayConstructorExpression; import org.apache.phoenix.expression.CaseExpression; import org.apache.phoenix.expression.CoerceExpression; import org.apache.phoenix.expression.ComparisonExpression; +import org.apache.phoenix.expression.ArrayColumnExpression; import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression; import org.apache.phoenix.expression.DivideExpression; import org.apache.phoenix.expression.Expression; @@ -113,6 +114,7 @@ public interface ExpressionVisitor<E> { public E visit(LiteralExpression node); public E visit(RowKeyColumnExpression node); public E visit(KeyValueColumnExpression node); + public E visit(ArrayColumnExpression node); public E visit(ProjectedColumnExpression node); public E visit(SequenceValueExpression node); http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ReplaceArrayColumnWithKeyValueColumnExpressionVisitor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ReplaceArrayColumnWithKeyValueColumnExpressionVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ReplaceArrayColumnWithKeyValueColumnExpressionVisitor.java new file mode 100644 index 0000000..7ca6d9e --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ReplaceArrayColumnWithKeyValueColumnExpressionVisitor.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.expression.visitor; + +import java.util.List; + +import org.apache.phoenix.expression.ArrayColumnExpression; +import org.apache.phoenix.expression.Expression; + +public class ReplaceArrayColumnWithKeyValueColumnExpressionVisitor extends CloneExpressionVisitor { + + @Override + public boolean isCloneNode(Expression node, List<Expression> children) { + return !children.equals(node.getChildren()); + } + + @Override + public Expression visit(ArrayColumnExpression node) { + return node.getKeyValueExpression(); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java index 3b7067a..9e50bc4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java @@ -26,9 +26,9 @@ import org.apache.phoenix.expression.ArrayConstructorExpression; import org.apache.phoenix.expression.CaseExpression; import org.apache.phoenix.expression.CoerceExpression; import org.apache.phoenix.expression.ComparisonExpression; +import org.apache.phoenix.expression.ArrayColumnExpression; import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression; import org.apache.phoenix.expression.DivideExpression; -import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.InListExpression; import org.apache.phoenix.expression.IsNullExpression; import org.apache.phoenix.expression.KeyValueColumnExpression; @@ -121,6 +121,11 @@ public class StatelessTraverseAllExpressionVisitor<E> extends TraverseAllExpress } @Override + public E visit(ArrayColumnExpression node) { + return null; + } + + @Override public E visit(ProjectedColumnExpression node) { return null; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java index 83b28bd..1a2f2cc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java @@ -26,9 +26,9 @@ import org.apache.phoenix.expression.ArrayConstructorExpression; import org.apache.phoenix.expression.CaseExpression; import org.apache.phoenix.expression.CoerceExpression; import org.apache.phoenix.expression.ComparisonExpression; +import org.apache.phoenix.expression.ArrayColumnExpression; import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression; import org.apache.phoenix.expression.DivideExpression; -import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.InListExpression; import org.apache.phoenix.expression.IsNullExpression; import org.apache.phoenix.expression.KeyValueColumnExpression; @@ -114,6 +114,11 @@ public class StatelessTraverseNoExpressionVisitor<E> extends TraverseNoExpressio public E visit(RowKeyColumnExpression node) { return null; } + + @Override + public E visit(ArrayColumnExpression node) { + return null; + } @Override public E visit(KeyValueColumnExpression node) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java index b8b0350..d1f6211 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java @@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; -import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.util.EncodedColumnsUtil; /** * When selecting specific columns in a SELECT query, this filter passes only selected columns @@ -53,6 +53,8 @@ public class ColumnProjectionFilter extends FilterBase implements Writable { private byte[] emptyCFName; private Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> columnsTracker; private Set<byte[]> conditionOnlyCfs; + private boolean usesEncodedColumnNames; + private byte[] emptyKVQualifier; public ColumnProjectionFilter() { @@ -60,10 +62,12 @@ public class ColumnProjectionFilter extends FilterBase implements Writable { public ColumnProjectionFilter(byte[] emptyCFName, Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> columnsTracker, - Set<byte[]> conditionOnlyCfs) { + Set<byte[]> conditionOnlyCfs, boolean usesEncodedColumnNames) { this.emptyCFName = emptyCFName; this.columnsTracker = columnsTracker; this.conditionOnlyCfs = conditionOnlyCfs; + this.usesEncodedColumnNames = usesEncodedColumnNames; + this.emptyKVQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst(); } @Override @@ -87,6 +91,9 @@ public class ColumnProjectionFilter extends FilterBase implements Writable { familyMapSize--; } int conditionOnlyCfsSize = WritableUtils.readVInt(input); + usesEncodedColumnNames = conditionOnlyCfsSize > 0; + emptyKVQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst(); + conditionOnlyCfsSize = Math.abs(conditionOnlyCfsSize) - 1; // restore to the actual value. this.conditionOnlyCfs = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); while (conditionOnlyCfsSize > 0) { this.conditionOnlyCfs.add(WritableUtils.readCompressedByteArray(input)); @@ -110,12 +117,13 @@ public class ColumnProjectionFilter extends FilterBase implements Writable { } } } - // Write conditionOnlyCfs - WritableUtils.writeVInt(output, this.conditionOnlyCfs.size()); + // Encode usesEncodedColumnNames in conditionOnlyCfs size. + WritableUtils.writeVInt(output, (this.conditionOnlyCfs.size() + 1) * (usesEncodedColumnNames ? 1 : -1)); for (byte[] f : this.conditionOnlyCfs) { WritableUtils.writeCompressedByteArray(output, f); } - } + +} @Override public byte[] toByteArray() throws IOException { @@ -153,9 +161,9 @@ public class ColumnProjectionFilter extends FilterBase implements Writable { // make sure we're not holding to any of the byte[]'s ptr.set(HConstants.EMPTY_BYTE_ARRAY); if (kvs.isEmpty()) { - kvs.add(new KeyValue(firstKV.getRowArray(), firstKV.getRowOffset(),firstKV.getRowLength(), this.emptyCFName, - 0, this.emptyCFName.length, QueryConstants.EMPTY_COLUMN_BYTES, 0, - QueryConstants.EMPTY_COLUMN_BYTES.length, HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0)); + kvs.add(new KeyValue(firstKV.getRowArray(), firstKV.getRowOffset(), firstKV.getRowLength(), + this.emptyCFName, 0, this.emptyCFName.length, emptyKVQualifier, 0, + emptyKVQualifier.length, HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0)); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java index dba700b..5909286 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java @@ -26,6 +26,7 @@ import java.util.TreeSet; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.expression.ArrayColumnExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.visitor.ExpressionVisitor; @@ -94,7 +95,7 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil refCount = foundColumns.size(); } - public ReturnCode resolveColumn(Cell value) { + private ReturnCode resolveColumn(Cell value) { // Always set key, in case we never find a key value column of interest, // and our expression uses row key columns. setKey(value); @@ -184,7 +185,12 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil ExpressionVisitor<Void> visitor = new StatelessTraverseAllExpressionVisitor<Void>() { @Override public Void visit(KeyValueColumnExpression expression) { - inputTuple.addColumn(expression.getColumnFamily(), expression.getColumnName()); + inputTuple.addColumn(expression.getColumnFamily(), expression.getColumnQualifier()); + return null; + } + @Override + public Void visit(ArrayColumnExpression expression) { + inputTuple.addColumn(expression.getArrayExpression().getColumnFamily(), expression.getArrayExpression().getColumnQualifier()); return null; } }; http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java index 0d904bc..195c89c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java @@ -47,7 +47,8 @@ public class SingleCQKeyValueComparisonFilter extends SingleKeyValueComparisonFi public static SingleCQKeyValueComparisonFilter parseFrom(final byte [] pbBytes) throws DeserializationException { try { - return (SingleCQKeyValueComparisonFilter)Writables.getWritable(pbBytes, new SingleCQKeyValueComparisonFilter()); + SingleCQKeyValueComparisonFilter writable = (SingleCQKeyValueComparisonFilter)Writables.getWritable(pbBytes, new SingleCQKeyValueComparisonFilter()); + return writable; } catch (IOException e) { throw new DeserializationException(e); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java index eaf8d35..527b948 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java @@ -22,11 +22,13 @@ import java.io.IOException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.expression.ArrayColumnExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor; import org.apache.phoenix.expression.visitor.TraverseAllExpressionVisitor; import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; @@ -58,7 +60,13 @@ public abstract class SingleKeyValueComparisonFilter extends BooleanExpressionFi @Override public Void visit(KeyValueColumnExpression expression) { cf = expression.getColumnFamily(); - cq = expression.getColumnName(); + cq = expression.getColumnQualifier(); + return null; + } + @Override + public Void visit(ArrayColumnExpression expression) { + cf = expression.getArrayExpression().getColumnFamily(); + cq = expression.getArrayExpression().getColumnQualifier(); return null; } }; http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java index bcadc2b..19797cf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java @@ -35,4 +35,5 @@ public interface ValueGetter { public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException; public byte[] getRowKey(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java index 6f9caa6..0f960e4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java @@ -32,7 +32,6 @@ import org.apache.phoenix.hbase.index.covered.TableState; import org.apache.phoenix.hbase.index.scanner.Scanner; import com.google.common.collect.Lists; -import com.google.common.collect.Lists; /** * http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index 6595562..b1dd5f4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -32,6 +32,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.hadoop.hbase.Cell; @@ -44,6 +45,7 @@ import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.compile.ColumnResolver; @@ -51,10 +53,14 @@ import org.apache.phoenix.compile.FromCompiler; import org.apache.phoenix.compile.IndexExpressionCompiler; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.expression.CoerceExpression; +import org.apache.phoenix.expression.ArrayColumnExpression; +import org.apache.phoenix.expression.ArrayConstructorExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.ExpressionType; import org.apache.phoenix.expression.KeyValueColumnExpression; +import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor; +import org.apache.phoenix.expression.visitor.ReplaceArrayColumnWithKeyValueColumnExpressionVisitor; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; @@ -67,14 +73,16 @@ import org.apache.phoenix.parse.SQLParser; import org.apache.phoenix.parse.StatelessTraverseAllParseNodeVisitor; import org.apache.phoenix.parse.UDFParseNode; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.AmbiguousColumnException; +import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PDatum; import org.apache.phoenix.schema.PIndexState; -import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTable.StorageScheme; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.SaltingUtil; @@ -82,17 +90,22 @@ import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.ValueSchema; import org.apache.phoenix.schema.ValueSchema.Field; +import org.apache.phoenix.schema.tuple.BaseTuple; import org.apache.phoenix.schema.tuple.ValueGetterTuple; import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.util.BitSet; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.ExpressionUtil; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TrustedByteArrayOutputStream; import org.apache.tephra.TxConstants; +import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; @@ -276,8 +289,14 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // columns required to evaluate all expressions in indexedExpressions (this does not include columns in the data row key) private Set<ColumnReference> indexedColumns; private Set<ColumnReference> coveredColumns; - // Map used to cache column family of data table and the corresponding column family for the local index - private Map<ImmutableBytesPtr, ImmutableBytesWritable> dataTableLocalIndexFamilyMap; + // Information for columns of data tables that are being indexed. The first part of the pair is column family and second part is the column name. + private Set<Pair<String, String>> indexedColumnsInfo; + // Information for columns of data tables that are being covered by the index. The first part of the pair is column family and second part is the column name. + private Set<Pair<String, String>> coveredColumnsInfo; + // Map of covered columns where a key is column reference for a column in the data table + // and value is column reference for corresponding column in the index table. + // TODO: samarth confirm that we don't need a separate map for tracking column families of local indexes. + private Map<ColumnReference, ColumnReference> coveredColumnsMap; // columns required to create index row i.e. indexedColumns + coveredColumns (this does not include columns in the data row key) private Set<ColumnReference> allColumns; // TODO remove this in the next major release @@ -291,39 +310,46 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { private boolean indexWALDisabled; private boolean isLocalIndex; private boolean immutableRows; + private boolean storeColsInSingleCell; // Transient state private final boolean isDataTableSalted; private final RowKeySchema dataRowKeySchema; - private List<ImmutableBytesPtr> indexQualifiers; private int estimatedIndexRowKeyBytes; private int estimatedExpressionSize; private int[] dataPkPosition; private int maxTrailingNulls; private ColumnReference dataEmptyKeyValueRef; private boolean rowKeyOrderOptimizable; + private boolean usesEncodedColumnNames; + private ImmutableBytesPtr emptyKeyValueQualifierPtr; private IndexMaintainer(RowKeySchema dataRowKeySchema, boolean isDataTableSalted) { this.dataRowKeySchema = dataRowKeySchema; this.isDataTableSalted = isDataTableSalted; } - private IndexMaintainer(PTable dataTable, PTable index, PhoenixConnection connection) { + private IndexMaintainer(final PTable dataTable, final PTable index, PhoenixConnection connection) { this(dataTable.getRowKeySchema(), dataTable.getBucketNum() != null); assert(dataTable.getType() == PTableType.SYSTEM || dataTable.getType() == PTableType.TABLE || dataTable.getType() == PTableType.VIEW); this.rowKeyOrderOptimizable = index.rowKeyOrderOptimizable(); this.isMultiTenant = dataTable.isMultiTenant(); this.viewIndexId = index.getViewIndexId() == null ? null : MetaDataUtil.getViewIndexIdDataType().toBytes(index.getViewIndexId()); this.isLocalIndex = index.getIndexType() == IndexType.LOCAL; - + /* + * There is nothing to prevent new indexes on existing tables to have encoded column names. + * Except, due to backward compatibility reasons, we aren't able to change IndexMaintainer and the state + * that is serialized in it. Because of this we are forced to have the indexes inherit the + * storage scheme of the parent data tables. + */ + this.usesEncodedColumnNames = EncodedColumnsUtil.usesEncodedColumnNames(dataTable); byte[] indexTableName = index.getPhysicalName().getBytes(); // Use this for the nDataSaltBuckets as we need this for local indexes // TODO: persist nDataSaltBuckets separately, but maintain b/w compat. Integer nIndexSaltBuckets = isLocalIndex ? dataTable.getBucketNum() : index.getBucketNum(); boolean indexWALDisabled = index.isWALDisabled(); int indexPosOffset = (index.getBucketNum() == null ? 0 : 1) + (this.isMultiTenant ? 1 : 0) + (this.viewIndexId == null ? 0 : 1); -// int indexPosOffset = !isLocalIndex && nIndexSaltBuckets > 0 ? 1 : 0; int nIndexColumns = index.getColumns().size() - indexPosOffset; int nIndexPKColumns = index.getPKColumns().size() - indexPosOffset; // number of expressions that are indexed that are not present in the row key of the data table @@ -334,7 +360,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { String dataFamilyName = IndexUtil.getDataColumnFamilyName(indexColumnName); String dataColumnName = IndexUtil.getDataColumnName(indexColumnName); try { - PColumn dataColumn = dataFamilyName.equals("") ? dataTable.getColumn(dataColumnName) : dataTable.getColumnFamily(dataFamilyName).getColumn(dataColumnName); + PColumn dataColumn = dataFamilyName.equals("") ? dataTable.getPColumnForColumnName(dataColumnName) : dataTable.getColumnFamily(dataFamilyName).getPColumnForColumnName(dataColumnName); if (SchemaUtil.isPKColumn(dataColumn)) continue; } catch (ColumnNotFoundException e) { @@ -367,7 +393,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { this.indexedColumnTypes = Lists.<PDataType>newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns); this.indexedExpressions = Lists.newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns); this.coveredColumns = Sets.newLinkedHashSetWithExpectedSize(nIndexColumns-nIndexPKColumns); - this.dataTableLocalIndexFamilyMap = Maps.newHashMapWithExpectedSize(nIndexColumns-nIndexPKColumns); + this.coveredColumnsMap = Maps.newHashMapWithExpectedSize(nIndexColumns - nIndexPKColumns); this.nIndexSaltBuckets = nIndexSaltBuckets == null ? 0 : nIndexSaltBuckets; this.dataEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(dataTable); this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(index); @@ -376,6 +402,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // TODO: check whether index is immutable or not. Currently it's always false so checking // data table is with immutable rows or not. this.immutableRows = dataTable.isImmutableRows(); + this.storeColsInSingleCell = index.getStorageScheme() == StorageScheme.COLUMNS_STORED_IN_SINGLE_CELL; int indexColByteSize = 0; ColumnResolver resolver = null; List<ParseNode> parseNodes = new ArrayList<ParseNode>(1); @@ -397,6 +424,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { throw new RuntimeException(e); // Impossible } StatementContext context = new StatementContext(new PhoenixStatement(connection), resolver); + this.indexedColumnsInfo = Sets.newHashSetWithExpectedSize(nIndexColumns - nIndexPKColumns); + this.coveredColumnsInfo = Sets.newHashSetWithExpectedSize(nIndexColumns - nIndexPKColumns); + IndexExpressionCompiler expressionIndexCompiler = new IndexExpressionCompiler(context); for (int i = indexPosOffset; i < index.getPKColumns().size(); i++) { PColumn indexColumn = index.getPKColumns().get(i); @@ -409,12 +439,13 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { throw new RuntimeException(e); // Impossible } if ( expressionIndexCompiler.getColumnRef()!=null ) { - // get the column of the data table that corresponds to this index column + // get the column of the data column that corresponds to this index column PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString()); boolean isPKColumn = SchemaUtil.isPKColumn(column); if (isPKColumn) { int dataPkPos = dataTable.getPKColumns().indexOf(column) - (dataTable.getBucketNum() == null ? 0 : 1) - (this.isMultiTenant ? 1 : 0); this.rowKeyMetaData.setIndexPkPosition(dataPkPos, indexPos); + indexedColumnsInfo.add(new Pair<>((String)null, column.getName().getString())); } else { indexColByteSize += column.getDataType().isFixedWidth() ? SchemaUtil.getFixedByteSize(column) : ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE; try { @@ -424,6 +455,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { expression = CoerceExpression.create(expression, indexColumn.getDataType()); } this.indexedExpressions.add(expression); + indexedColumnsInfo.add(new Pair<>(column.getFamilyName().getString(), column.getName().getString())); } catch (SQLException e) { throw new RuntimeException(e); // Impossible } @@ -432,6 +464,45 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { else { indexColByteSize += expression.getDataType().isFixedWidth() ? SchemaUtil.getFixedByteSize(expression) : ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE; this.indexedExpressions.add(expression); + KeyValueExpressionVisitor kvVisitor = new KeyValueExpressionVisitor() { + @Override + public Void visit(KeyValueColumnExpression colExpression) { + return addDataColInfo(dataTable, colExpression); + } + + @Override + public Void visit(ArrayColumnExpression expression) { + return addDataColInfo(dataTable, expression); + } + + private Void addDataColInfo(final PTable dataTable, Expression expression) { + Preconditions.checkArgument(expression instanceof ArrayColumnExpression + || expression instanceof KeyValueColumnExpression); + + KeyValueColumnExpression colExpression = null; + if (expression instanceof ArrayColumnExpression) { + colExpression = + ((ArrayColumnExpression) expression).getKeyValueExpression(); + } else { + colExpression = ((KeyValueColumnExpression) expression); + } + byte[] cf = colExpression.getColumnFamily(); + byte[] cq = colExpression.getColumnQualifier(); + try { + PColumn dataColumn = + cf == null ? dataTable.getPColumnForColumnQualifier(null, cq) + : dataTable.getColumnFamily(cf) + .getPColumnForColumnQualifier(cq); + indexedColumnsInfo.add(new Pair<>(dataColumn.getFamilyName() + .getString(), dataColumn.getName().getString())); + } catch (ColumnNotFoundException | ColumnFamilyNotFoundException + | AmbiguousColumnException e) { + throw new RuntimeException(e); + } + return null; + } + }; + expression.accept(kvVisitor); } // set the sort order of the expression correctly if (indexColumn.getSortOrder() == SortOrder.DESC) { @@ -442,19 +513,20 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { for (int i = 0; i < index.getColumnFamilies().size(); i++) { PColumnFamily family = index.getColumnFamilies().get(i); for (PColumn indexColumn : family.getColumns()) { - PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString()); - PName dataTableFamily = column.getFamilyName(); - this.coveredColumns.add(new ColumnReference(dataTableFamily.getBytes(), column.getName().getBytes())); - if(isLocalIndex) { - this.dataTableLocalIndexFamilyMap.put(new ImmutableBytesPtr(dataTableFamily.getBytes()), new ImmutableBytesWritable(Bytes.toBytes(IndexUtil.getLocalIndexColumnFamily(dataTableFamily.getString())))); - } + PColumn dataColumn = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString()); + byte[] dataColumnCq = EncodedColumnsUtil.getColumnQualifier(dataColumn, dataTable); + byte[] indexColumnCq = EncodedColumnsUtil.getColumnQualifier(indexColumn, index); + this.coveredColumns.add(new ColumnReference(dataColumn.getFamilyName().getBytes(), dataColumnCq)); + this.coveredColumnsMap.put(new ColumnReference(dataColumn.getFamilyName().getBytes(), dataColumnCq), + new ColumnReference(indexColumn.getFamilyName().getBytes(), indexColumnCq)); + this.coveredColumnsInfo.add(new Pair<>(dataColumn.getFamilyName().getString(), dataColumn.getName().getString())); } } this.estimatedIndexRowKeyBytes = estimateIndexRowKeyByteSize(indexColByteSize); initCachedState(); } - - public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey) { + + public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey, boolean convertArrayColToKeyValueCol) { ImmutableBytesWritable ptr = new ImmutableBytesWritable(); boolean prependRegionStartKey = isLocalIndex && regionStartKey != null; boolean isIndexSalted = !isLocalIndex && nIndexSaltBuckets > 0; @@ -523,6 +595,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { SortOrder dataSortOrder; if (dataPkPosition[i] == EXPRESSION_NOT_PRESENT) { Expression expression = expressionIterator.next(); + if (convertArrayColToKeyValueCol) { + expression = expression.accept(new ReplaceArrayColumnWithKeyValueColumnExpressionVisitor()); + } dataColumnType = expression.getDataType(); dataSortOrder = expression.getSortOrder(); isNullable = expression.isNullable(); @@ -855,36 +930,84 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { return indexRowKeySchema; } - public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter valueGetter, ImmutableBytesWritable dataRowKeyPtr, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException { + public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter valueGetter, ImmutableBytesWritable dataRowKeyPtr, long ts, byte[] regionStartKey, byte[] regionEndKey, boolean convertArrayColToKeyValueCol) throws IOException { Put put = null; // New row being inserted: add the empty key value if (valueGetter.getLatestValue(dataEmptyKeyValueRef) == null) { - byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey); + byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey, convertArrayColToKeyValueCol); put = new Put(indexRowKey); // add the keyvalue for the empty row put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey), - this.getEmptyKeyValueFamily(), QueryConstants.EMPTY_COLUMN_BYTES_PTR, ts, + this.getEmptyKeyValueFamily(), emptyKeyValueQualifierPtr, ts, // set the value to the empty column name - QueryConstants.EMPTY_COLUMN_BYTES_PTR)); + emptyKeyValueQualifierPtr)); put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } - int i = 0; - for (ColumnReference ref : this.getCoveredColumns()) { - ImmutableBytesPtr cq = this.indexQualifiers.get(i++); - ImmutableBytesWritable value = valueGetter.getLatestValue(ref); - byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey); - ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey); - if (value != null) { + byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey, convertArrayColToKeyValueCol); + ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey); + if (storeColsInSingleCell) { + // map from column family to list of columns (for covered columns) + Map<String, List<ColumnReference>> familyToColListMap = Maps.newHashMap(); + for (ColumnReference ref : this.getCoveredColumns()) { + String cf = Bytes.toString(ref.getFamily()); + if (!familyToColListMap.containsKey(cf)) { + familyToColListMap.put(cf, Lists.<ColumnReference>newArrayList()); + } + familyToColListMap.get(cf).add(ref); + } + // iterate over each column family and create a byte[] containing all the columns + for (Entry<String, List<ColumnReference>> entry : familyToColListMap.entrySet()) { + byte[] columnFamily = entry.getKey().getBytes(); + List<ColumnReference> colRefs = entry.getValue(); + int maxIndex = Integer.MIN_VALUE; + // find the max col qualifier + for (ColumnReference colRef : colRefs) { + byte[] qualifier = this.coveredColumnsMap.get(colRef).getQualifier(); + maxIndex = Math.max(maxIndex, PInteger.INSTANCE.getCodec().decodeInt(qualifier, 0, SortOrder.getDefault())); + } + byte[][] colValues = new byte[maxIndex+1][]; + // set the values of the columns + for (ColumnReference colRef : colRefs) { + ImmutableBytesWritable value = valueGetter.getLatestValue(colRef); + if (value != null) { + byte[] qualifier = this.coveredColumnsMap.get(colRef).getQualifier(); + int index = PInteger.INSTANCE.getCodec().decodeInt(qualifier, 0, SortOrder.getDefault()); + colValues[index] = value.get(); + } + } + + List<Expression> children = Lists.newArrayListWithExpectedSize(colRefs.size()); + // create an expression list with all the columns + for (int j=0; j<colValues.length; ++j) { + children.add(new LiteralExpression(colValues[j]==null ? ByteUtil.EMPTY_BYTE_ARRAY : colValues[j] )); + } + // we use ArrayConstructorExpression to serialize multiple columns into a single byte[] + // construct the ArrayConstructorExpression with a variable length data type (PVarchar) since columns can be of fixed or variable length + ArrayConstructorExpression arrayExpression = new ArrayConstructorExpression(children, PVarchar.INSTANCE, rowKeyOrderOptimizable); + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + arrayExpression.evaluate(new BaseTuple() {}, ptr); if (put == null) { put = new Put(indexRowKey); put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } + ImmutableBytesPtr colFamilyPtr = new ImmutableBytesPtr(columnFamily); //this is a little bit of extra work for installations that are running <0.94.14, but that should be rare and is a short-term set of wrappers - it shouldn't kill GC - if(this.isLocalIndex) { - ImmutableBytesWritable localIndexColFamily = this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable()); - put.add(kvBuilder.buildPut(rowKey, localIndexColFamily, cq, ts, value)); - } else { - put.add(kvBuilder.buildPut(rowKey, ref.getFamilyWritable(), cq, ts, value)); + put.add(kvBuilder.buildPut(rowKey, colFamilyPtr, colFamilyPtr, ts, ptr)); + } + } + else { + for (ColumnReference ref : this.getCoveredColumns()) { + //FIXME: samarth figure out a backward compatible way to handle this as coveredcolumnsmap won't be availble for older phoenix clients. + ColumnReference indexColRef = this.coveredColumnsMap.get(ref); + ImmutableBytesPtr cq = indexColRef.getQualifierWritable(); + ImmutableBytesPtr cf = indexColRef.getFamilyWritable(); + ImmutableBytesWritable value = valueGetter.getLatestValue(ref); + if (value != null) { + if (put == null) { + put = new Put(indexRowKey); + put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); + } + put.add(kvBuilder.buildPut(rowKey, cf, cq, ts, value)); } } } @@ -964,7 +1087,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { @SuppressWarnings("deprecation") public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<KeyValue> pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException { - byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, regionStartKey, regionEndKey); + byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, regionStartKey, regionEndKey, false); // Delete the entire row if any of the indexed columns changed DeleteType deleteType = null; if (oldState == null || (deleteType=getDeleteTypeOrNull(pendingUpdates)) != null || hasIndexedColumnChanged(oldState, pendingUpdates)) { // Deleting the entire row @@ -973,14 +1096,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { for (ColumnReference ref : getCoveredColumns()) { byte[] family = ref.getFamily(); - if (this.isLocalIndex) { - family = this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable()).get(); - } + ColumnReference indexColumn = coveredColumnsMap.get(ref); // If table delete was single version, then index delete should be as well if (deleteType == DeleteType.SINGLE_VERSION) { - delete.deleteFamilyVersion(family, ts); + delete.deleteFamilyVersion(indexColumn.getFamily(), ts); } else { - delete.deleteFamily(family, ts); + delete.deleteFamily(indexColumn.getFamily(), ts); } } if (deleteType == DeleteType.SINGLE_VERSION) { @@ -1001,12 +1122,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { delete = new Delete(indexRowKey); delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } - byte[] family = this.isLocalIndex ? this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable()).get() : ref.getFamily(); + ColumnReference indexColumn = coveredColumnsMap.get(ref); // If point delete for data table, then use point delete for index as well - if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) { - delete.deleteColumn(family, IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts); + if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) { + //FIXME: samarth change this. Index column qualifiers are not derivable from data table cqs. + // Figure out a backward compatible way of going this since coveredColumnsMap won't be available + // for older clients. + delete.deleteColumn(indexColumn.getFamily(), indexColumn.getQualifier(), ts); } else { - delete.deleteColumns(family, IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts); + delete.deleteColumns(indexColumn.getFamily(), indexColumn.getQualifier(), ts); } } } @@ -1061,15 +1185,16 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { isLocalIndex = encodedCoveredolumnsAndLocalIndex < 0; int nCoveredColumns = Math.abs(encodedCoveredolumnsAndLocalIndex) - 1; coveredColumns = Sets.newLinkedHashSetWithExpectedSize(nCoveredColumns); - dataTableLocalIndexFamilyMap = Maps.newHashMapWithExpectedSize(nCoveredColumns); + coveredColumnsMap = Maps.newHashMapWithExpectedSize(nCoveredColumns); for (int i = 0; i < nCoveredColumns; i++) { - byte[] cf = Bytes.readByteArray(input); - byte[] cq = Bytes.readByteArray(input); - ColumnReference ref = new ColumnReference(cf,cq); - coveredColumns.add(ref); - if(isLocalIndex) { - dataTableLocalIndexFamilyMap.put(ref.getFamilyWritable(), new ImmutableBytesWritable(Bytes.toBytes(IndexUtil.getLocalIndexColumnFamily(Bytes.toString(cf))))); - } + byte[] dataTableCf = Bytes.readByteArray(input); + byte[] dataTableCq = Bytes.readByteArray(input); + byte[] indexTableCf = Bytes.readByteArray(input); + byte[] indexTableCq = Bytes.readByteArray(input); + ColumnReference dataColumn = new ColumnReference(dataTableCf, dataTableCq); + coveredColumns.add(dataColumn); + ColumnReference indexColumn = new ColumnReference(indexTableCf, indexTableCq); + coveredColumnsMap.put(dataColumn, indexColumn); } // Hack to serialize whether the index row key is optimizable int len = WritableUtils.readVInt(input); @@ -1095,6 +1220,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { if (isNewClient) { int numIndexedExpressions = WritableUtils.readVInt(input); + usesEncodedColumnNames = numIndexedExpressions > 0; + numIndexedExpressions = Math.abs(numIndexedExpressions) - 1; indexedExpressions = Lists.newArrayListWithExpectedSize(numIndexedExpressions); for (int i = 0; i < numIndexedExpressions; i++) { Expression expression = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance(); @@ -1148,6 +1275,22 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { int encodedEstimatedIndexRowKeyBytesAndImmutableRows = WritableUtils.readVInt(input); this.immutableRows = encodedEstimatedIndexRowKeyBytesAndImmutableRows < 0; this.estimatedIndexRowKeyBytes = Math.abs(encodedEstimatedIndexRowKeyBytesAndImmutableRows); + int numCols = WritableUtils.readVInt(input); + //TODO: samarth figure out a backward compatible way of reading/writing indexedColumnsInfo + indexedColumnsInfo = Sets.newHashSetWithExpectedSize(numCols); + for (int i = 1; i <= numCols; i++) { + byte[] dataTableCf = Bytes.readByteArray(input); + byte[] dataTableCq = Bytes.readByteArray(input); + indexedColumnsInfo.add(new Pair<>(Bytes.toString(dataTableCf), Bytes.toString(dataTableCq))); + } + coveredColumnsInfo = Sets.newHashSetWithExpectedSize(numCols); + int numCoveredCols = WritableUtils.readVInt(input); + for (int i = 1; i <= numCoveredCols; i++) { + byte[] dataTableCf = Bytes.readByteArray(input); + byte[] dataTableCq = Bytes.readByteArray(input); + coveredColumnsInfo.add(new Pair<>(Bytes.toString(dataTableCf), Bytes.toString(dataTableCq))); + } + storeColsInSingleCell = WritableUtils.readVInt(input) > 0; initCachedState(); } @@ -1171,9 +1314,13 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } // Encode coveredColumns.size() and whether or not this is a local index WritableUtils.writeVInt(output, (coveredColumns.size() + 1) * (isLocalIndex ? -1 : 1)); - for (ColumnReference ref : coveredColumns) { - Bytes.writeByteArray(output, ref.getFamily()); - Bytes.writeByteArray(output, ref.getQualifier()); + for (Entry<ColumnReference, ColumnReference> ref : coveredColumnsMap.entrySet()) { + ColumnReference dataColumn = ref.getKey(); + ColumnReference indexColumn = ref.getValue(); + Bytes.writeByteArray(output, dataColumn.getFamily()); + Bytes.writeByteArray(output, dataColumn.getQualifier()); + Bytes.writeByteArray(output, indexColumn.getFamily()); + Bytes.writeByteArray(output, indexColumn.getQualifier()); } // TODO: remove when rowKeyOrderOptimizable hack no longer needed WritableUtils.writeVInt(output,indexTableName.length * (rowKeyOrderOptimizable ? 1 : -1)); @@ -1184,7 +1331,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { WritableUtils.writeVInt(output,-emptyKeyValueCFPtr.getLength()); output.write(emptyKeyValueCFPtr.get(),emptyKeyValueCFPtr.getOffset(), emptyKeyValueCFPtr.getLength()); - WritableUtils.writeVInt(output, indexedExpressions.size()); + // Hack to encode usesEncodedColumnNames in indexedExpressions size. + int indexedExpressionsSize = (indexedExpressions.size() + 1) * (usesEncodedColumnNames ? 1 : -1); + WritableUtils.writeVInt(output, indexedExpressionsSize); for (Expression expression : indexedExpressions) { WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal()); expression.write(output); @@ -1195,6 +1344,17 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { WritableUtils.writeVInt(output, (nDataCFs + 1) * (indexWALDisabled ? -1 : 1)); // Encode estimatedIndexRowKeyBytes and immutableRows together. WritableUtils.writeVInt(output, estimatedIndexRowKeyBytes * (immutableRows ? -1 : 1)); + WritableUtils.writeVInt(output, indexedColumnsInfo.size()); + for (Pair<String, String> colInfo : indexedColumnsInfo) { + Bytes.writeByteArray(output, colInfo.getFirst() == null ? null : colInfo.getFirst().getBytes()); + Bytes.writeByteArray(output, colInfo.getSecond().getBytes()); + } + WritableUtils.writeVInt(output, coveredColumnsInfo.size()); + for (Pair<String, String> colInfo : coveredColumnsInfo) { + Bytes.writeByteArray(output, colInfo.getFirst() == null ? null : colInfo.getFirst().getBytes()); + Bytes.writeByteArray(output, colInfo.getSecond().getBytes()); + } + WritableUtils.writeVInt(output, storeColsInSingleCell ? 1 : -1); } public int getEstimatedByteSize() { @@ -1241,16 +1401,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { * Init calculated state reading/creating */ private void initCachedState() { - dataEmptyKeyValueRef = - new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(), - QueryConstants.EMPTY_COLUMN_BYTES); - - indexQualifiers = Lists.newArrayListWithExpectedSize(this.coveredColumns.size()); - for (ColumnReference ref : coveredColumns) { - indexQualifiers.add(new ImmutableBytesPtr(IndexUtil.getIndexColumnName( - ref.getFamily(), ref.getQualifier()))); - } - + byte[] emptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst(); + dataEmptyKeyValueRef = new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(), emptyKvQualifier); + emptyKeyValueQualifierPtr = new ImmutableBytesPtr(emptyKvQualifier); this.allColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + coveredColumns.size()); // columns that are required to evaluate all expressions in indexedExpressions (not including columns in data row key) this.indexedColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size()); @@ -1258,11 +1411,19 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { KeyValueExpressionVisitor visitor = new KeyValueExpressionVisitor() { @Override public Void visit(KeyValueColumnExpression expression) { - if (indexedColumns.add(new ColumnReference(expression.getColumnFamily(), expression.getColumnName()))) { + if (indexedColumns.add(new ColumnReference(expression.getColumnFamily(), expression.getColumnQualifier()))) { indexedColumnTypes.add(expression.getDataType()); } return null; } + @Override + public Void visit(ArrayColumnExpression expression) { + KeyValueColumnExpression colExpression = expression.getArrayExpression(); + if (indexedColumns.add(new ColumnReference(colExpression.getColumnFamily(), colExpression.getColumnQualifier()))) { + indexedColumnTypes.add(colExpression.getDataType()); + } + return null; + } }; expression.accept(visitor); } @@ -1523,4 +1684,16 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { return udfParseNodes; } } + + public byte[] getEmptyKeyValueQualifier() { + return emptyKeyValueQualifierPtr.copyBytes(); + } + + public Set<Pair<String, String>> getCoveredColumnInfo() { + return coveredColumnsInfo; + } + + public Set<Pair<String, String>> getIndexedColumnInfo() { + return indexedColumnsInfo; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java index 9d2955b..b1454b7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java @@ -74,7 +74,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec { indexUpdate.setTable(maintainer.isLocalIndex() ? state.getEnvironment().getRegion() .getTableDesc().getName() : maintainer.getIndexTableName()); Put put = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), env - .getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey()); + .getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey(), false); indexUpdate.setUpdate(put); indexUpdates.add(indexUpdate); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index c67da6e..9ee5ea7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -67,7 +67,6 @@ import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.write.IndexWriter; import org.apache.phoenix.query.KeyRange; -import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.trace.TracingUtils; import org.apache.phoenix.trace.util.NullSpan; @@ -304,8 +303,16 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { for (ColumnReference ref : mutableColumns) { scan.addColumn(ref.getFamily(), ref.getQualifier()); } + /* + * Indexes inherit the storage scheme of the data table which means all the indexes have the same + * storage scheme and empty key value qualifier. Note that this assumption would be broken if we start + * supporting new indexes over existing data tables to have a different storage scheme than the data + * table. + */ + byte[] emptyKeyValueQualifier = indexMaintainers.get(0).getEmptyKeyValueQualifier(); + // Project empty key value column - scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES); + scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), emptyKeyValueQualifier); ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1); scanRanges.initializeScan(scan); TableName tableName = env.getRegion().getRegionInfo().getTable(); @@ -356,7 +363,8 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { Map<ImmutableBytesPtr, MultiMutation> mutationsToFindPreviousValue) throws IOException { if (scanner != null) { Result result; - ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES); + ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0) + .getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier()); // Process existing data table rows by removing the old index row and adding the new index row while ((result = scanner.next()) != null) { Mutation m = mutationsToFindPreviousValue.remove(new ImmutableBytesPtr(result.getRow())); @@ -384,7 +392,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { // to generate point delete markers for all index rows that were added. We don't have Tephra // manage index rows in change sets because we don't want to be hit with the additional // memory hit and do not need to do conflict detection on index rows. - ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES); + ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier()); while ((result = scanner.next()) != null) { Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow())); // Sort by timestamp, type, cf, cq so we can process in time batches from oldest to newest http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 2685b93..0d6d881 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -17,12 +17,16 @@ */ package org.apache.phoenix.iterate; +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER; +import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME; import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY; +import static org.apache.phoenix.util.ScanUtil.setMinMaxQualifiersOnScan; import java.io.ByteArrayInputStream; import java.io.DataInput; @@ -49,6 +53,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.hadoop.hbase.HConstants; + +import javax.management.Query; + import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; @@ -82,14 +89,17 @@ import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTable.StorageScheme; import org.apache.phoenix.schema.PTable.ViewType; import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.stats.PTableStats; import org.apache.phoenix.schema.stats.StatisticsUtil; +import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.Closeables; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.PrefixByteCodec; import org.apache.phoenix.util.PrefixByteDecoder; @@ -208,7 +218,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result // Project empty key value unless the column family containing it has // been projected in its entirety. if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) { - scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES); + scan.addColumn(ecf, EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst()); } } } @@ -226,7 +236,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result if(offset!=null){ ScanUtil.addOffsetAttribute(scan, offset); } - int cols = plan.getGroupBy().getOrderPreservingColumnCount(); if (cols > 0 && keyOnlyFilter && !plan.getStatement().getHint().hasHint(HintNode.Hint.RANGE_SCAN) && @@ -238,13 +247,77 @@ public abstract class BaseResultIterators extends ExplainTable implements Result new DistinctPrefixFilter(plan.getTableRef().getTable().getRowKeySchema(), cols)); } - + //TODO: samarth add condition to not do position based look ups in case of joins so that we won't need to do the hacky check inside co-processors. + if (setMinMaxQualifiersOnScan(table)) { + Pair<Integer, Integer> minMaxQualifiers = getMinMaxQualifiers(scan, context); + if (minMaxQualifiers != null) { + scan.setAttribute(BaseScannerRegionObserver.MIN_QUALIFIER, PInteger.INSTANCE.toBytes(minMaxQualifiers.getFirst())); + scan.setAttribute(BaseScannerRegionObserver.MAX_QUALIFIER, PInteger.INSTANCE.toBytes(minMaxQualifiers.getSecond())); + } + } if (optimizeProjection) { optimizeProjection(context, scan, table, statement); } } } - + + private static Pair<Integer, Integer> getMinMaxQualifiers(Scan scan, StatementContext context) { + PTable table = context.getCurrentTable().getTable(); + StorageScheme storageScheme = table.getStorageScheme(); + checkArgument(EncodedColumnsUtil.usesEncodedColumnNames(storageScheme), "Method should only be used for tables using encoded column names"); + Integer minQualifier = null; + Integer maxQualifier = null; + boolean emptyKVProjected = false; + for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) { + byte[] cq = whereCol.getSecond(); + if (cq != null) { + int qualifier = (Integer)PInteger.INSTANCE.toObject(cq); + if (qualifier == ENCODED_EMPTY_COLUMN_NAME) { + emptyKVProjected = true; + continue; + } + if (minQualifier == null && maxQualifier == null) { + minQualifier = maxQualifier = qualifier; + } else { + if (qualifier < minQualifier) { + minQualifier = qualifier; + } else if (qualifier > maxQualifier) { + maxQualifier = qualifier; + } + } + } + } + Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap(); + for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) { + if (entry.getValue() != null) { + for (byte[] cq : entry.getValue()) { + if (cq != null) { + int qualifier = (Integer)PInteger.INSTANCE.toObject(cq); + if (qualifier == ENCODED_EMPTY_COLUMN_NAME) { + emptyKVProjected = true; + continue; + } + if (minQualifier == null && maxQualifier == null) { + minQualifier = maxQualifier = qualifier; + } else { + if (qualifier < minQualifier) { + minQualifier = qualifier; + } else if (qualifier > maxQualifier) { + maxQualifier = qualifier; + } + } + } + } + } + } + if (minQualifier == null && emptyKVProjected) { + return new Pair<>(ENCODED_EMPTY_COLUMN_NAME, ENCODED_EMPTY_COLUMN_NAME); + } else if (minQualifier == null) { + return null; + } + return new Pair<>(minQualifier, maxQualifier); + } + private static void optimizeProjection(StatementContext context, Scan scan, PTable table, FilterableStatement statement) { Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap(); // columnsTracker contain cf -> qualifiers which should get returned. @@ -341,7 +414,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result // the ExplicitColumnTracker not to be used, though. if (!statement.isAggregate() && filteredColumnNotInProjection) { ScanUtil.andFilterAtEnd(scan, new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table), - columnsTracker, conditionOnlyCfs)); + columnsTracker, conditionOnlyCfs, EncodedColumnsUtil.usesEncodedColumnNames(table.getStorageScheme()))); } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java index 3293f65..1e5f09e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java @@ -49,7 +49,7 @@ abstract public class LookAheadResultIterator implements PeekingResultIterator { }; } - private final static Tuple UNINITIALIZED = new ResultTuple(); + private final static Tuple UNINITIALIZED = ResultTuple.EMPTY_TUPLE; private Tuple next = UNINITIALIZED; abstract protected Tuple advance() throws SQLException; http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java index 8ada952..135ab26 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java @@ -180,6 +180,7 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> { return this.index; } + @Override public int size() { if (flushBuffer) return flushedCount; http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java index 8dcb2e8..e4c52c0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java @@ -32,6 +32,7 @@ import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.OrderByExpression; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.SizedUtil; import com.google.common.base.Function; @@ -264,7 +265,7 @@ public class OrderedResultIterator implements PeekingResultIterator { } this.byteSize = queueEntries.getByteSize(); } catch (IOException e) { - throw new SQLException("", e); + ServerUtil.createIOException(e.getMessage(), e); } finally { delegate.close(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java index 88e141a..531bbe7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java @@ -24,16 +24,24 @@ import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; public class RegionScannerResultIterator extends BaseResultIterator { private final RegionScanner scanner; + private final Pair<Integer, Integer> minMaxQualifiers; + private final boolean useQualifierAsIndex; - public RegionScannerResultIterator(RegionScanner scanner) { + public RegionScannerResultIterator(RegionScanner scanner, Pair<Integer, Integer> minMaxQualifiers, boolean isJoin) { this.scanner = scanner; + this.useQualifierAsIndex = ScanUtil.useQualifierAsIndex(minMaxQualifiers, isJoin); + this.minMaxQualifiers = minMaxQualifiers; } @Override @@ -43,7 +51,7 @@ public class RegionScannerResultIterator extends BaseResultIterator { synchronized (scanner) { try { // TODO: size - List<Cell> results = new ArrayList<Cell>(); + List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond()) : new ArrayList<Cell>(); // Results are potentially returned even when the return value of s.next is false // since this is an indication of whether or not there are more values after the // ones returned @@ -53,7 +61,7 @@ public class RegionScannerResultIterator extends BaseResultIterator { } // We instantiate a new tuple because in all cases currently we hang on to it // (i.e. to compute and hold onto the TopN). - MultiKeyValueTuple tuple = new MultiKeyValueTuple(); + Tuple tuple = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); tuple.setKeyValues(results); return tuple; } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index 5a4a791..e4b32b1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -205,7 +205,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { public static final byte[] BASE_COLUMN_COUNT_BYTES = Bytes.toBytes(BASE_COLUMN_COUNT); public static final String IS_ROW_TIMESTAMP = "IS_ROW_TIMESTAMP"; public static final byte[] IS_ROW_TIMESTAMP_BYTES = Bytes.toBytes(IS_ROW_TIMESTAMP); - + public static final String TABLE_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY; public static final byte[] TABLE_FAMILY_BYTES = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES; @@ -315,6 +315,13 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { /** Version below which we fall back on the generic KeyValueBuilder */ public static final int CLIENT_KEY_VALUE_BUILDER_THRESHOLD = VersionUtil.encodeVersion("0", "94", "14"); + public static final String STORAGE_SCHEME = "STORAGE_SCHEME"; + public static final byte[] STORAGE_SCHEME_BYTES = Bytes.toBytes(STORAGE_SCHEME); + public static final String ENCODED_COLUMN_QUALIFIER = "COLUMN_QUALIFIER"; + public static final byte[] ENCODED_COLUMN_QUALIFIER_BYTES = Bytes.toBytes(ENCODED_COLUMN_QUALIFIER); + public static final String COLUMN_QUALIFIER_COUNTER = "QUALIFIER_COUNTER"; + public static final byte[] COLUMN_QUALIFIER_COUNTER_BYTES = Bytes.toBytes(COLUMN_QUALIFIER_COUNTER); + PhoenixDatabaseMetaData(PhoenixConnection connection) throws SQLException { this.emptyResultSet = new PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, new StatementContext(new PhoenixStatement(connection), false)); this.connection = connection; @@ -588,9 +595,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { newCells.addAll(cells); newCells.add(kv); Collections.sort(newCells, KeyValue.COMPARATOR); - resultTuple.setResult(Result.create(newCells)); + tuple = new ResultTuple(Result.create(newCells)); } - return tuple; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java index 47c17ae..3ca48a1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java @@ -107,7 +107,7 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable { private final static String STRING_FALSE = "0"; private final static BigDecimal BIG_DECIMAL_FALSE = BigDecimal.valueOf(0); private final static Integer INTEGER_FALSE = Integer.valueOf(0); - private final static Tuple BEFORE_FIRST = new ResultTuple(); + private final static Tuple BEFORE_FIRST = ResultTuple.EMPTY_TUPLE; private final ResultIterator scanner; private final RowProjector rowProjector; http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java index 908a117..2d7550a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java @@ -122,6 +122,7 @@ public class HashCacheFactory implements ServerCacheFactory { int resultSize = (int)Bytes.readVLong(hashCacheByteArray, offset); offset += WritableUtils.decodeVIntSize(hashCacheByteArray[offset]); ImmutableBytesWritable value = new ImmutableBytesWritable(hashCacheByteArray,offset,resultSize); + //TODO: samarth make joins work with position look up. Tuple result = new ResultTuple(ResultUtil.toResult(value)); ImmutableBytesPtr key = TupleUtil.getConcatenatedValue(result, onExpressions); List<Tuple> tuples = hashCacheMap.get(key); http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java index b12326a..a6a57c7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java @@ -49,6 +49,7 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.SchemaUtil; @@ -208,7 +209,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri not care about it */ private void initColumnIndexes() throws SQLException { - columnIndexes = new TreeMap(Bytes.BYTES_COMPARATOR); + columnIndexes = new TreeMap<>(Bytes.BYTES_COMPARATOR); int columnIndex = 0; for(int index = 0; index < logicalNames.size(); index++) { PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index)); @@ -216,18 +217,23 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri for (int i = 0; i < cls.size(); i++) { PColumn c = cls.get(i); byte[] family = new byte[0]; - if (c.getFamilyName() != null) // Skip PK column + byte[] cq; + if (!SchemaUtil.isPKColumn(c)) { family = c.getFamilyName().getBytes(); - byte[] name = c.getName().getBytes(); - byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name); + cq = EncodedColumnsUtil.getColumnQualifier(c, table); + } else { + // TODO: samarth verify if this is the right thing to do here. + cq = c.getName().getBytes(); + } + byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq); if (!columnIndexes.containsKey(cfn)) { columnIndexes.put(cfn, new Integer(columnIndex)); columnIndex++; } } byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table); - byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES, - QueryConstants.EMPTY_COLUMN_BYTES); + byte[] emptyKeyValue = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst(); + byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES, emptyKeyValue); columnIndexes.put(cfn, new Integer(columnIndex)); columnIndex++; } @@ -243,9 +249,9 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri private int findIndex(Cell cell) throws IOException { byte[] familyName = Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()); - byte[] name = Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), + byte[] cq = Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); - byte[] cfn = Bytes.add(familyName, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name); + byte[] cfn = Bytes.add(familyName, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq); if(columnIndexes.containsKey(cfn)) { return columnIndexes.get(cfn); }