This is an automated email from the ASF dual-hosted git repository. larsh pushed a commit to branch 4.x in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push: new 78a5ab0 PHOENIX-6402 Allow using local indexes with uncovered columns in the WHERE clause. 78a5ab0 is described below commit 78a5ab06bb382a04974ca93c7a33aff0867c7132 Author: Lars <la...@apache.org> AuthorDate: Sat Mar 6 12:51:14 2021 -0800 PHOENIX-6402 Allow using local indexes with uncovered columns in the WHERE clause. --- .../apache/phoenix/end2end/index/LocalIndexIT.java | 84 ++++++++++++++++++---- .../org/apache/phoenix/compile/WhereCompiler.java | 35 ++++++--- .../coprocessor/BaseScannerRegionObserver.java | 2 + .../phoenix/iterate/BaseResultIterators.java | 33 ++++----- .../org/apache/phoenix/iterate/ExplainTable.java | 11 ++- .../phoenix/iterate/OrderedResultIterator.java | 5 ++ .../phoenix/iterate/RegionScannerFactory.java | 35 +++++++++ .../apache/phoenix/schema/types/PVarbinary.java | 2 +- 8 files changed, 165 insertions(+), 42 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java index 276395e..a3f3ed1 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java @@ -81,22 +81,77 @@ public class LocalIndexIT extends BaseLocalIndexIT { @Test public void testSelectFromIndexWithAdditionalWhereClause() throws Exception { + if (isNamespaceMapped) { + return; + } String tableName = schemaName + "." + generateUniqueName(); String indexName = "IDX_" + generateUniqueName(); Connection conn = getConnection(); conn.setAutoCommit(true); - if (isNamespaceMapped) { - conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName); - } - conn.createStatement().execute("CREATE TABLE " + tableName + " (pk INTEGER PRIMARY KEY, v1 FLOAT, v2 FLOAT)"); + conn.createStatement().execute("CREATE TABLE " + tableName + " (pk INTEGER PRIMARY KEY, v1 FLOAT, v2 FLOAT, v3 INTEGER)"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES(1, 2, 3, 4)"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES(2, 3, 4, 5)"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES(3, 4, 5, 6)"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES(4, 5, 6, 7)"); + conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1)"); - conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES(1, 0.01, 1.0)"); - ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM "+tableName+" WHERE v1 < 0.1 and v2 < 10.0"); + testExtraWhere(conn, tableName); + + conn.createStatement().execute("DROP INDEX " + indexName + " ON " + tableName); + conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1) INCLUDE (v3)"); + testExtraWhere(conn, tableName); + + conn.createStatement().execute("DROP INDEX " + indexName + " ON " + tableName); + conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1) INCLUDE (v2)"); + testExtraWhere(conn, tableName); + + conn.createStatement().execute("DROP INDEX " + indexName + " ON " + tableName); + conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1) INCLUDE (v2,v3)"); + testExtraWhere(conn, tableName); + } + + private void testExtraWhere(Connection conn, String tableName) throws SQLException { + ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM "+tableName+" WHERE v1 < 3 AND v2 < 4"); + rs.next(); + assertEquals(1, rs.getInt(1)); + rs.close(); + + rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM "+tableName+" WHERE v1 < 3 AND v3 < 5"); + rs.next(); + assertEquals(1, rs.getInt(1)); + rs.close(); + + rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM "+tableName+" WHERE v1 < 10 AND v2 < 0 AND v3 < 0"); + rs.next(); + assertEquals(0, rs.getInt(1)); + rs.close(); + + rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM "+tableName+" WHERE v1 <= 2 AND v2 > 0 AND v3 < 5"); rs.next(); assertEquals(1, rs.getInt(1)); rs.close(); + + rs = conn.createStatement().executeQuery("SELECT pk FROM "+tableName+" WHERE v1 > 3 AND v2 > 0 AND v3 > 6"); + rs.next(); + assertEquals(4, rs.getInt(1)); + rs.close(); + + rs = conn.createStatement().executeQuery("SELECT v1 FROM "+tableName+" WHERE v1 > 3 AND v2 > 0 AND v3 > 6"); + rs.next(); + assertEquals(5, rs.getInt(1)); + rs.close(); + + rs = conn.createStatement().executeQuery("SELECT pk FROM "+tableName+" WHERE (v1,v2) IN ((1,5),(4,5))"); + rs.next(); + assertEquals(3, rs.getInt(1)); + rs.close(); + + rs = conn.createStatement().executeQuery("SELECT v3 FROM "+tableName+" WHERE (v1,v2) IN ((1,5),(4,5))"); + rs.next(); + assertEquals(6, rs.getInt(1)); + rs.close(); } @Test @@ -231,13 +286,13 @@ public class LocalIndexIT extends BaseLocalIndexIT { QueryUtil.getExplainPlan(rs)); rs.close(); - // 4. Longer prefix on the index. - // Note: This cannot use the local index, see PHOENIX-6300 + // 4. Longer prefix on the index, use it. rs = conn.createStatement().executeQuery("EXPLAIN SELECT v2 FROM " + tableName + " WHERE pk1 = 3 AND pk2 = 4 AND v1 = 3 AND v3 = 1"); assertEquals( "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " - + physicalTableName + " [3,4]\n" - + " SERVER FILTER BY (V1 = 3.0 AND V3 = 1)", + + physicalTableName + " [1,3,4,3]\n" + + " SERVER FILTER BY FIRST KEY ONLY AND \"V3\" = 1\n" + + "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs)); rs.close(); } @@ -373,12 +428,13 @@ public class LocalIndexIT extends BaseLocalIndexIT { QueryUtil.getExplainPlan(rs)); rs.close(); - // 10. Cannot use index even when also filtering on non-indexed column, see PHOENIX-6400 + // 10. Use index even when also filtering on non-indexed column rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + tableName + " WHERE v2 = 2 AND v1 = 3"); assertEquals( - "CLIENT PARALLEL 1-WAY FULL SCAN OVER " - + indexPhysicalTableName + "\n" - + " SERVER FILTER BY (V2 = 2.0 AND V1 = 3.0)", + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + + indexPhysicalTableName + " [1,2]\n" + + " SERVER FILTER BY FIRST KEY ONLY AND \"V1\" = 3.0\n" + + "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs)); rs.close(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java index 558be36..a157150 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java @@ -19,6 +19,9 @@ package org.apache.phoenix.compile; import static org.apache.phoenix.util.EncodedColumnsUtil.isPossibleToUseEncodedCQFilter; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.util.Collections; @@ -30,11 +33,14 @@ import com.google.common.base.Optional; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.WritableUtils; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.expression.AndExpression; import org.apache.phoenix.expression.Determinism; import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.ExpressionType; import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor; @@ -201,16 +207,6 @@ public class WhereCompiler { return ref; } PTable table = ref.getTable(); - // If current table in the context is local index and table in column reference is global that - // means the column is not present in the local index. Local indexes do not currently support this. - // Throwing this exception here will cause this plan to be ignored when enumerating possible plans - // during the optimizing phase. - if (context.getCurrentTable().getTable().getIndexType() == IndexType.LOCAL - && (table.getIndexType() == null || table.getIndexType() == IndexType.GLOBAL)) { - String schemaNameStr = table.getSchemaName()==null?null:table.getSchemaName().getString(); - String tableNameStr = table.getTableName()==null?null:table.getTableName().getString(); - throw new ColumnNotFoundException(schemaNameStr, tableNameStr, null, ref.getColumn().getName().getString()); - } // Track if we need to compare KeyValue during filter evaluation // using column family. If the column qualifier is enough, we // just use that. @@ -282,6 +278,25 @@ public class WhereCompiler { if (LiteralExpression.isBooleanFalseOrNull(whereClause)) { context.setScanRanges(ScanRanges.NOTHING); + } else if (context.getCurrentTable().getTable().getIndexType() == IndexType.LOCAL) { + if (whereClause != null && !ExpressionUtil.evaluatesToTrue(whereClause)) { + // pass any extra where as scan attribute so it can be evaluated after all + // columns from the main CF have been merged in + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + try { + DataOutputStream output = new DataOutputStream(stream); + WritableUtils.writeVInt(output, ExpressionType.valueOf(whereClause).ordinal()); + whereClause.write(output); + stream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_FILTER, stream.toByteArray()); + + // this is needed just for ExplainTable, since de-serializing an expression does not restore + // its display properties, and that cannot be changed, due to backwards compatibility + scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_FILTER_STR, Bytes.toBytes(whereClause.toString())); + } } else if (whereClause != null && !ExpressionUtil.evaluatesToTrue(whereClause)) { Filter filter = null; final Counter counter = new Counter(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 2ad520f..7bb84f7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -97,6 +97,8 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { "_IndexRebuildDisableLoggingVerifyType"; public static final String INDEX_REBUILD_DISABLE_LOGGING_BEYOND_MAXLOOKBACK_AGE = "_IndexRebuildDisableLoggingBeyondMaxLookbackAge"; + public static final String LOCAL_INDEX_FILTER = "_LocalIndexFilter"; + public static final String LOCAL_INDEX_FILTER_STR = "_LocalIndexFilterStr"; /* * Attribute to denote that the index maintainer has been serialized using its proto-buf presentation. diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 3023ad9..9190a33 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -405,6 +405,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result ImmutableStorageScheme storageScheme = table.getImmutableStorageScheme(); BitSet trackedColumnsBitset = isPossibleToUseEncodedCQFilter(encodingScheme, storageScheme) && !hasDynamicColumns(table) ? new BitSet(10) : null; boolean filteredColumnNotInProjection = false; + for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) { byte[] filteredFamily = whereCol.getFirst(); if (!(familyMap.containsKey(filteredFamily))) { @@ -444,22 +445,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result preventSeekToColumn = referencedCfCount == 1 && hbaseServerVersion < MIN_SEEK_TO_COLUMN_VERSION; } } - for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) { - ImmutableBytesPtr cf = new ImmutableBytesPtr(entry.getKey()); - NavigableSet<byte[]> qs = entry.getValue(); - NavigableSet<ImmutableBytesPtr> cols = null; - if (qs != null) { - cols = new TreeSet<ImmutableBytesPtr>(); - for (byte[] q : qs) { - cols.add(new ImmutableBytesPtr(q)); - if (trackedColumnsBitset != null) { - int qualifier = encodingScheme.decode(q); - trackedColumnsBitset.set(qualifier); - } - } - } - columnsTracker.put(cf, cols); - } // Making sure that where condition CFs are getting scanned at HRS. for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) { byte[] family = whereCol.getFirst(); @@ -492,6 +477,22 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } } } + for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) { + ImmutableBytesPtr cf = new ImmutableBytesPtr(entry.getKey()); + NavigableSet<byte[]> qs = entry.getValue(); + NavigableSet<ImmutableBytesPtr> cols = null; + if (qs != null) { + cols = new TreeSet<ImmutableBytesPtr>(); + for (byte[] q : qs) { + cols.add(new ImmutableBytesPtr(q)); + if (trackedColumnsBitset != null) { + int qualifier = encodingScheme.decode(q); + trackedColumnsBitset.set(qualifier); + } + } + } + columnsTracker.put(cf, cols); + } if (!columnsTracker.isEmpty()) { if (preventSeekToColumn) { for (ImmutableBytesPtr f : columnsTracker.keySet()) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java index 31713d9..cf5e021 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java @@ -178,10 +178,19 @@ public abstract class ExplainTable { } } while (filterIterator.hasNext()); } + String whereFilterStr = null; if (whereFilter != null) { + whereFilterStr = whereFilter.toString(); + } else { + byte[] expBytes = scan.getAttribute(BaseScannerRegionObserver.LOCAL_INDEX_FILTER_STR); + if (expBytes != null) { + whereFilterStr = Bytes.toString(expBytes); + } + } + if (whereFilterStr != null) { String serverWhereFilter = "SERVER FILTER BY " + (firstKeyOnlyFilter == null ? "" : "FIRST KEY ONLY AND ") - + whereFilter.toString(); + + whereFilterStr; planSteps.add(" " + serverWhereFilter); if (explainPlanAttributesBuilder != null) { explainPlanAttributesBuilder.setServerWhereFilter(serverWhereFilter); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java index 13d75ea..bb0607c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java @@ -280,6 +280,11 @@ public class OrderedResultIterator implements PeekingResultIterator { final SizeAwareQueue<ResultEntry> queueEntries = ((RecordPeekingResultIterator)resultIterator).getQueueEntries(); long startTime = EnvironmentEdgeManager.currentTimeMillis(); for (Tuple result = delegate.next(); result != null; result = delegate.next()) { + // result might be empty if it was filtered by a local index + if (result.size() == 0) { + continue; + } + if (isDummy(result)) { dummyTuple = result; return resultIterator; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java index 5b587df..6be45d3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java @@ -43,9 +43,12 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.ScannerContextUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.WritableUtils; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.coprocessor.generated.DynamicColumnMetaDataProtos; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.ExpressionType; import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.index.IndexMaintainer; @@ -65,6 +68,8 @@ import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -126,6 +131,25 @@ public abstract class RegionScannerFactory { private byte[] actualStartKey = getActualStartKey(); private boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan); final long pageSizeMs = getPageSizeMsForRegionScanner(scan); + Expression extraWhere = null; + + { + // for local indexes construct the row filter for uncovered columns if it exists + if (ScanUtil.isLocalIndex(scan)) { + byte[] expBytes = scan.getAttribute(BaseScannerRegionObserver.LOCAL_INDEX_FILTER); + if (expBytes != null) { + try { + ByteArrayInputStream stream = new ByteArrayInputStream(expBytes); + DataInputStream input = new DataInputStream(stream); + extraWhere = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance(); + extraWhere.readFields(input); + } catch (IOException io) { + // should not happen since we're reading from a byte[] + throw new RuntimeException(io); + } + } + } + } // Get the actual scan start row of local index. This will be used to compare the row // key of the results less than scan start row when there are references. @@ -207,6 +231,17 @@ public abstract class RegionScannerFactory { */ IndexUtil.wrapResultUsingOffset(env, result, offset, dataColumns, tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr); + + if (extraWhere != null) { + Tuple merged = useQualifierAsListIndex ? new PositionBasedResultTuple(result) : + new ResultTuple(Result.create(result)); + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + extraWhere.evaluate(merged, ptr); + if (!Boolean.TRUE.equals(extraWhere.getDataType().toObject(ptr))) { + result.clear(); + return next; + } + } } if (projector != null) { Tuple toProject = useQualifierAsListIndex ? new PositionBasedResultTuple(result) : diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java index b3ce57a..83909c4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java @@ -142,7 +142,7 @@ public class PVarbinary extends PBinaryBase { StringBuilder buf = new StringBuilder(); buf.append('['); if (length > 0) { - for (int i = o; i < length; i++) { + for (int i = o; i < o + length; i++) { buf.append(0xFF & b[i]); buf.append(','); }