This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch 5.2 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.2 by this push: new 931a3e7743 PHOENIX-7229 Leverage bloom filters for single key point lookups (#1832) 931a3e7743 is described below commit 931a3e7743c47696cb14fb94de23070f841e0bcf Author: tkhurana <khurana.ta...@gmail.com> AuthorDate: Sat Feb 24 21:20:15 2024 -0800 PHOENIX-7229 Leverage bloom filters for single key point lookups (#1832) Co-authored-by: Viraj Jasani <vjas...@apache.org> --- .../phoenix/iterate/BaseResultIterators.java | 17 +- phoenix-core/pom.xml | 5 + .../org/apache/phoenix/end2end/BloomFilterIT.java | 244 +++++++++++++++++++++ .../apache/phoenix/compile/WhereOptimizerTest.java | 50 +++++ .../java/org/apache/phoenix/util/TestUtil.java | 5 + 5 files changed, 320 insertions(+), 1 deletion(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 5fda525316..359cf2255b 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -37,6 +37,7 @@ import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.DataInputStream; import java.io.EOFException; +import java.io.IOException; import java.sql.SQLException; import java.util.*; import java.util.Arrays; @@ -72,6 +73,7 @@ import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; import org.apache.phoenix.coprocessorclient.HashJoinCacheNotFoundException; import org.apache.phoenix.coprocessorclient.UngroupedAggregateRegionObserverHelper; +import org.apache.phoenix.exception.PhoenixIOException; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.execute.ScanPlan; @@ -936,7 +938,20 @@ public abstract class BaseResultIterators extends ExplainTable implements Result if (!isLocalIndex && scanRanges.isPointLookup() && !scanRanges.useSkipScanFilter()) { List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(1); List<Scan> scans = Lists.newArrayListWithExpectedSize(1); - scans.add(context.getScan()); + Scan scanFromContext = context.getScan(); + if (scanRanges.getPointLookupCount() == 1) { + // leverage bloom filter for single key point lookup by turning scan to + // Get Scan#isGetScan() + try { + scanFromContext = new Scan(context.getScan()); + } catch (IOException e) { + LOGGER.error("Failure to construct point lookup scan", e); + throw new PhoenixIOException(e); + } + scanFromContext.withStopRow(scanFromContext.getStartRow(), + scanFromContext.includeStartRow()); + } + scans.add(scanFromContext); parallelScans.add(scans); generateEstimates(scanRanges, table, GuidePostsInfo.NO_GUIDEPOST, GuidePostsInfo.NO_GUIDEPOST.isEmptyGuidePost(), parallelScans, estimates, diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml index 6b0738e365..36f994d6ae 100644 --- a/phoenix-core/pom.xml +++ b/phoenix-core/pom.xml @@ -227,6 +227,11 @@ <artifactId>hbase-hadoop2-compat</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-hadoop-compat</artifactId> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-mapreduce</artifactId> diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BloomFilterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BloomFilterIT.java new file mode 100644 index 0000000000..555650d2ee --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BloomFilterIT.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.util.TestUtil.flush; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.MetricsRegionServer; +import org.apache.hadoop.hbase.regionserver.MetricsRegionServerWrapper; +import org.apache.phoenix.thirdparty.com.google.common.base.MoreObjects; +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category(NeedsOwnMiniClusterTest.class) +public class BloomFilterIT extends ParallelStatsDisabledIT { + + private static final Logger LOGGER = LoggerFactory.getLogger(BloomFilterIT.class); + private static class BloomFilterMetrics { + // total lookup requests + private long requestsCount; + // requests where key does not exist + private long negativeResultsCount; + // potential lookup requests rejected because no bloom filter present in storefile + private long eligibleRequestsCount; + + private BloomFilterMetrics() { + this.requestsCount = 0; + this.negativeResultsCount = 0; + this.eligibleRequestsCount = 0; + } + + private BloomFilterMetrics(long requestsCount, long negativeResultsCount, long eligibleRequestsCount) { + this.requestsCount = requestsCount; + this.negativeResultsCount = negativeResultsCount; + this.eligibleRequestsCount = eligibleRequestsCount; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + BloomFilterMetrics rhs = (BloomFilterMetrics)obj; + return (this.requestsCount == rhs.requestsCount && + this.negativeResultsCount == rhs.negativeResultsCount && + this.eligibleRequestsCount == rhs.eligibleRequestsCount); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("requestsCount", requestsCount) + .add("negativeResultsCount", negativeResultsCount) + .add("eligibleRequestsCount", eligibleRequestsCount) + .toString(); + } + } + private BloomFilterMetrics beforeMetrics; + + private BloomFilterMetrics getBloomFilterMetrics() { + HBaseTestingUtility util = getUtility(); + HRegionServer regionServer = util.getHBaseCluster().getRegionServer(0); + MetricsRegionServer regionServerMetrics = regionServer.getMetrics(); + MetricsRegionServerWrapper regionServerWrapper = regionServerMetrics.getRegionServerWrapper(); + long requestsCount = regionServerWrapper.getBloomFilterRequestsCount(); + long negativeResultsCount = regionServerWrapper.getBloomFilterNegativeResultsCount(); + long eligibleRequestsCount = regionServerWrapper.getBloomFilterEligibleRequestsCount(); + return new BloomFilterMetrics(requestsCount, negativeResultsCount, eligibleRequestsCount); + } + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map<String, String> props = Maps.newHashMapWithExpectedSize(1); + props.put(HConstants.REGIONSERVER_METRICS_PERIOD, Long.toString(1000)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Before + public void testSetup() { + beforeMetrics = getBloomFilterMetrics(); + } + + @Test + public void testPointLookup() throws Exception { + String tableName = generateUniqueName(); + BloomFilterMetrics expectedMetrics = new BloomFilterMetrics(); + //String ddl = String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, V1 VARCHAR) BLOOMFILTER='NONE'", tableName); + String ddl = String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, V1 VARCHAR)", tableName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute(ddl); + populateTable(conn, tableName); + // flush the memstore to storefiles which will write the bloom filter + flush(getUtility(), TableName.valueOf(tableName)); + // negative key point lookup + String dql = String.format("SELECT * FROM %s where id = 7", tableName); + try (ResultSet rs = conn.createStatement().executeQuery(dql)) { + assertFalse(rs.next()); + expectedMetrics.requestsCount +=1; + expectedMetrics.negativeResultsCount +=1; + } + // key exists + dql = String.format("SELECT * FROM %s where id = 4", tableName); + try (ResultSet rs = conn.createStatement().executeQuery(dql)) { + assertTrue(rs.next()); + assertEquals(4, rs.getInt(1)); + // negative request shouldn't be incremented since key exists + expectedMetrics.requestsCount +=1; + } + // multiple keys point lookup + dql = String.format("SELECT * FROM %s where id IN (4,7)", tableName); + try (ResultSet rs = conn.createStatement().executeQuery(dql)) { + assertTrue(rs.next()); + assertEquals(4, rs.getInt(1)); + assertFalse(rs.next()); + // bloom filter won't be used since scan start/stop key is different + } + verifyBloomFilterMetrics(expectedMetrics); + } + } + + @Test + public void testPointLookupOnSaltedTable() throws Exception { + String tableName = generateUniqueName(); + BloomFilterMetrics expectedMetrics = new BloomFilterMetrics(); + String ddl = String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, V1 VARCHAR) SALT_BUCKETS=3", tableName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute(ddl); + populateTable(conn, tableName); + flush(getUtility(), TableName.valueOf(tableName)); + // negative key point lookup + String dql = String.format("SELECT * FROM %s where id = 7", tableName); + try (ResultSet rs = conn.createStatement().executeQuery(dql)) { + assertFalse(rs.next()); + expectedMetrics.requestsCount +=1; + expectedMetrics.negativeResultsCount +=1; + } + } + verifyBloomFilterMetrics(expectedMetrics); + } + + @Test + public void testAlterBloomFilter() throws Exception { + String tableName = generateUniqueName(); + BloomFilterMetrics expectedMetrics = new BloomFilterMetrics(); + String ddl = String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, V1 VARCHAR) BLOOMFILTER='NONE'", tableName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute(ddl); + populateTable(conn, tableName); + flush(getUtility(), TableName.valueOf(tableName)); + String dql = String.format("SELECT * FROM %s where id = 7", tableName); + try (ResultSet rs = conn.createStatement().executeQuery(dql)) { + assertFalse(rs.next()); + // since bloom filter is not enabled + expectedMetrics.eligibleRequestsCount +=1; + verifyBloomFilterMetrics(expectedMetrics); + } + ddl = String.format("ALTER TABLE %s SET BLOOMFILTER='ROW'", tableName); + conn.createStatement().execute(ddl); + // alter table changes the table descriptor and table region re-opens reset metrics + beforeMetrics = getBloomFilterMetrics(); + expectedMetrics = new BloomFilterMetrics(); + // Insert 2 more rows + String dml = String.format("UPSERT INTO %s VALUES (100, 'val_100')", tableName); + conn.createStatement().execute(dml); + dml = String.format("UPSERT INTO %s VALUES (200, 'val_200')", tableName); + conn.createStatement().execute(dml); + conn.commit(); + // A new storefile should be created with bloom filter + flush(getUtility(), TableName.valueOf(tableName)); + dql = String.format("SELECT * FROM %s where id = 150", tableName); + try (ResultSet rs = conn.createStatement().executeQuery(dql)) { + assertFalse(rs.next()); + expectedMetrics.requestsCount +=1; + expectedMetrics.negativeResultsCount +=1; + verifyBloomFilterMetrics(expectedMetrics); + } + } + } + + private void verifyBloomFilterMetrics(BloomFilterMetrics expectedMetrics) throws InterruptedException { + long metricsDelay = config.getLong(HConstants.REGIONSERVER_METRICS_PERIOD, + HConstants.DEFAULT_REGIONSERVER_METRICS_PERIOD); + // Ensure that the region server accumulates the metrics + Thread.sleep(metricsDelay + 1000); + BloomFilterMetrics afterMetrics = getBloomFilterMetrics(); + LOGGER.info("Before={} After={} Expected={}", beforeMetrics, afterMetrics, expectedMetrics); + + BloomFilterMetrics deltaMetrics = new BloomFilterMetrics( + afterMetrics.requestsCount - beforeMetrics.requestsCount, + afterMetrics.negativeResultsCount - beforeMetrics.negativeResultsCount, + afterMetrics.eligibleRequestsCount - beforeMetrics.eligibleRequestsCount); + + Assert.assertEquals(expectedMetrics, deltaMetrics); + } + + private void populateTable(Connection conn, String tableName) throws SQLException { + try(PreparedStatement ps = conn.prepareStatement(String.format("UPSERT INTO %s VALUES (?, ?)", tableName))) { + for (int i = 0; i < 16; i = i + 2) { + ps.setInt(1, i); + ps.setString(2, "val_" + i); + ps.executeUpdate(); + } + conn.commit(); + } + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java index 20bd097e04..c1584c19c5 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java @@ -2502,6 +2502,56 @@ public class WhereOptimizerTest extends BaseConnectionlessQueryTest { assertArrayEquals(stopRow, scan.getStopRow()); } + @Test + public void testScanRangeForPointLookup() throws SQLException { + String tenantId = "000000000000001"; + String entityId = "002333333333333"; + String query = String.format("select * from atable where organization_id='%s' and entity_id='%s'", + tenantId, entityId); + try (Connection conn = DriverManager.getConnection(getUrl())) { + QueryPlan optimizedPlan = TestUtil.getOptimizeQueryPlan(conn, query); + byte[] startRow = ByteUtil.concat(PVarchar.INSTANCE.toBytes(tenantId), PVarchar.INSTANCE.toBytes(entityId)); + byte[] stopRow = ByteUtil.nextKey(startRow); + validateScanRangesForPointLookup(optimizedPlan, startRow, stopRow); + } + } + + @Test + public void testScanRangeForPointLookupRVC() throws SQLException { + String tenantId = "000000000000001"; + String entityId = "002333333333333"; + String query = String.format("select * from atable where (organization_id, entity_id) IN (('%s','%s'))", + tenantId, entityId); + try (Connection conn = DriverManager.getConnection(getUrl())) { + QueryPlan optimizedPlan = TestUtil.getOptimizeQueryPlan(conn, query); + byte[] startRow = ByteUtil.concat(PVarchar.INSTANCE.toBytes(tenantId), PVarchar.INSTANCE.toBytes(entityId)); + byte[] stopRow = ByteUtil.nextKey(startRow); + validateScanRangesForPointLookup(optimizedPlan, startRow, stopRow); + } + } + + private static void validateScanRangesForPointLookup(QueryPlan optimizedPlan, byte[] startRow, byte[] stopRow) { + StatementContext context = optimizedPlan.getContext(); + ScanRanges scanRanges = context.getScanRanges(); + assertTrue(scanRanges.isPointLookup()); + assertEquals(1, scanRanges.getPointLookupCount()); + // scan from StatementContext has scan range [start, next(start)] + Scan scanFromContext = context.getScan(); + assertArrayEquals(startRow, scanFromContext.getStartRow()); + assertTrue(scanFromContext.includeStartRow()); + assertArrayEquals(stopRow, scanFromContext.getStopRow()); + assertFalse(scanFromContext.includeStopRow()); + + List<List<Scan>> scans = optimizedPlan.getScans(); + assertEquals(1, scans.size()); + assertEquals(1, scans.get(0).size()); + Scan scanFromIterator = scans.get(0).get(0); + // scan from iterator has same start and stop row [start, start] i.e a Get + assertTrue(scanFromIterator.isGetScan()); + assertTrue(scanFromIterator.includeStartRow()); + assertTrue(scanFromIterator.includeStopRow()); + } + private static StatementContext compileStatementTenantSpecific(String tenantId, String query, List<Object> binds) throws Exception { PhoenixConnection pconn = getTenantSpecificConnection("tenantId").unwrap(PhoenixConnection.class); PhoenixPreparedStatement pstmt = new PhoenixPreparedStatement(pconn, query); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java index fe0d8d1b2c..cf076f08ef 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java @@ -810,6 +810,11 @@ public class TestUtil { conn.createStatement().execute(ddl); } + public static void flush(HBaseTestingUtility utility, TableName table) throws IOException { + Admin admin = utility.getAdmin(); + admin.flush(table); + } + public static void majorCompact(HBaseTestingUtility utility, TableName table) throws IOException, InterruptedException { long compactionRequestedSCN = EnvironmentEdgeManager.currentTimeMillis();