This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 00bb0d0517 PHOENIX-7229 Leverage bloom filters for single key point 
lookups (#1832)
00bb0d0517 is described below

commit 00bb0d0517cb9d56dcf6c61b37421728667d2753
Author: tkhurana <khurana.ta...@gmail.com>
AuthorDate: Sat Feb 24 21:20:15 2024 -0800

    PHOENIX-7229 Leverage bloom filters for single key point lookups (#1832)
    
    Co-authored-by: Viraj Jasani <vjas...@apache.org>
---
 .../phoenix/iterate/BaseResultIterators.java       |  17 +-
 phoenix-core/pom.xml                               |   5 +
 .../org/apache/phoenix/end2end/BloomFilterIT.java  | 244 +++++++++++++++++++++
 .../apache/phoenix/compile/WhereOptimizerTest.java |  50 +++++
 .../java/org/apache/phoenix/util/TestUtil.java     |   5 +
 5 files changed, 320 insertions(+), 1 deletion(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 5fda525316..359cf2255b 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -37,6 +37,7 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.EOFException;
+import java.io.IOException;
 import java.sql.SQLException;
 import java.util.*;
 import java.util.Arrays;
@@ -72,6 +73,7 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
 import org.apache.phoenix.coprocessorclient.HashJoinCacheNotFoundException;
 import 
org.apache.phoenix.coprocessorclient.UngroupedAggregateRegionObserverHelper;
+import org.apache.phoenix.exception.PhoenixIOException;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.execute.ScanPlan;
@@ -936,7 +938,20 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         if (!isLocalIndex && scanRanges.isPointLookup() && 
!scanRanges.useSkipScanFilter()) {
             List<List<Scan>> parallelScans = 
Lists.newArrayListWithExpectedSize(1);
             List<Scan> scans = Lists.newArrayListWithExpectedSize(1);
-            scans.add(context.getScan());
+            Scan scanFromContext = context.getScan();
+            if (scanRanges.getPointLookupCount() == 1) {
+                // leverage bloom filter for single key point lookup by 
turning scan to
+                // Get Scan#isGetScan()
+                try {
+                    scanFromContext = new Scan(context.getScan());
+                } catch (IOException e) {
+                    LOGGER.error("Failure to construct point lookup scan", e);
+                    throw new PhoenixIOException(e);
+                }
+                scanFromContext.withStopRow(scanFromContext.getStartRow(),
+                    scanFromContext.includeStartRow());
+            }
+            scans.add(scanFromContext);
             parallelScans.add(scans);
             generateEstimates(scanRanges, table, GuidePostsInfo.NO_GUIDEPOST,
                     GuidePostsInfo.NO_GUIDEPOST.isEmptyGuidePost(), 
parallelScans, estimates,
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 6b0738e365..36f994d6ae 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -227,6 +227,11 @@
             <artifactId>hbase-hadoop2-compat</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-hadoop-compat</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-mapreduce</artifactId>
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BloomFilterIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BloomFilterIT.java
new file mode 100644
index 0000000000..555650d2ee
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BloomFilterIT.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.flush;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
+import org.apache.hadoop.hbase.regionserver.MetricsRegionServerWrapper;
+import org.apache.phoenix.thirdparty.com.google.common.base.MoreObjects;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class BloomFilterIT extends ParallelStatsDisabledIT {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BloomFilterIT.class);
+    private static class BloomFilterMetrics {
+        // total lookup requests
+        private long requestsCount;
+        // requests where key does not exist
+        private long negativeResultsCount;
+        // potential lookup requests rejected because no bloom filter present 
in storefile
+        private long eligibleRequestsCount;
+
+        private BloomFilterMetrics() {
+            this.requestsCount = 0;
+            this.negativeResultsCount = 0;
+            this.eligibleRequestsCount = 0;
+        }
+
+        private BloomFilterMetrics(long requestsCount, long 
negativeResultsCount, long eligibleRequestsCount) {
+            this.requestsCount = requestsCount;
+            this.negativeResultsCount = negativeResultsCount;
+            this.eligibleRequestsCount = eligibleRequestsCount;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null || getClass() != obj.getClass()) {
+                return false;
+            }
+            BloomFilterMetrics rhs = (BloomFilterMetrics)obj;
+            return (this.requestsCount == rhs.requestsCount &&
+                    this.negativeResultsCount == rhs.negativeResultsCount &&
+                    this.eligibleRequestsCount == rhs.eligibleRequestsCount);
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                    .add("requestsCount", requestsCount)
+                    .add("negativeResultsCount", negativeResultsCount)
+                    .add("eligibleRequestsCount", eligibleRequestsCount)
+                    .toString();
+        }
+    }
+    private BloomFilterMetrics beforeMetrics;
+
+    private BloomFilterMetrics getBloomFilterMetrics() {
+        HBaseTestingUtility util = getUtility();
+        HRegionServer regionServer = util.getHBaseCluster().getRegionServer(0);
+        MetricsRegionServer regionServerMetrics = regionServer.getMetrics();
+        MetricsRegionServerWrapper regionServerWrapper = 
regionServerMetrics.getRegionServerWrapper();
+        long requestsCount = regionServerWrapper.getBloomFilterRequestsCount();
+        long negativeResultsCount = 
regionServerWrapper.getBloomFilterNegativeResultsCount();
+        long eligibleRequestsCount = 
regionServerWrapper.getBloomFilterEligibleRequestsCount();
+        return new BloomFilterMetrics(requestsCount, negativeResultsCount, 
eligibleRequestsCount);
+    }
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(HConstants.REGIONSERVER_METRICS_PERIOD, Long.toString(1000));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Before
+    public void testSetup() {
+        beforeMetrics = getBloomFilterMetrics();
+    }
+
+    @Test
+    public void testPointLookup() throws Exception {
+        String tableName = generateUniqueName();
+        BloomFilterMetrics expectedMetrics = new BloomFilterMetrics();
+        //String ddl = String.format("CREATE TABLE %s (ID INTEGER NOT NULL 
PRIMARY KEY, V1 VARCHAR) BLOOMFILTER='NONE'", tableName);
+        String ddl = String.format("CREATE TABLE %s (ID INTEGER NOT NULL 
PRIMARY KEY, V1 VARCHAR)", tableName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute(ddl);
+            populateTable(conn, tableName);
+            // flush the memstore to storefiles which will write the bloom 
filter
+            flush(getUtility(), TableName.valueOf(tableName));
+            // negative key point lookup
+            String dql = String.format("SELECT * FROM %s where id = 7", 
tableName);
+            try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+                assertFalse(rs.next());
+                expectedMetrics.requestsCount +=1;
+                expectedMetrics.negativeResultsCount +=1;
+            }
+            // key exists
+            dql = String.format("SELECT * FROM %s where id = 4", tableName);
+            try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+                assertTrue(rs.next());
+                assertEquals(4, rs.getInt(1));
+                // negative request shouldn't be incremented since key exists
+                expectedMetrics.requestsCount +=1;
+            }
+            // multiple keys point lookup
+            dql = String.format("SELECT * FROM %s where id IN (4,7)", 
tableName);
+            try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+                assertTrue(rs.next());
+                assertEquals(4, rs.getInt(1));
+                assertFalse(rs.next());
+                // bloom filter won't be used since scan start/stop key is 
different
+            }
+            verifyBloomFilterMetrics(expectedMetrics);
+        }
+    }
+
+    @Test
+    public void testPointLookupOnSaltedTable() throws Exception {
+        String tableName = generateUniqueName();
+        BloomFilterMetrics expectedMetrics = new BloomFilterMetrics();
+        String ddl = String.format("CREATE TABLE %s (ID INTEGER NOT NULL 
PRIMARY KEY, V1 VARCHAR) SALT_BUCKETS=3", tableName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute(ddl);
+            populateTable(conn, tableName);
+            flush(getUtility(), TableName.valueOf(tableName));
+            // negative key point lookup
+            String dql = String.format("SELECT * FROM %s where id = 7", 
tableName);
+            try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+                assertFalse(rs.next());
+                expectedMetrics.requestsCount +=1;
+                expectedMetrics.negativeResultsCount +=1;
+            }
+        }
+        verifyBloomFilterMetrics(expectedMetrics);
+    }
+
+    @Test
+    public void testAlterBloomFilter() throws Exception {
+        String tableName = generateUniqueName();
+        BloomFilterMetrics expectedMetrics = new BloomFilterMetrics();
+        String ddl = String.format("CREATE TABLE %s (ID INTEGER NOT NULL 
PRIMARY KEY, V1 VARCHAR) BLOOMFILTER='NONE'", tableName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute(ddl);
+            populateTable(conn, tableName);
+            flush(getUtility(), TableName.valueOf(tableName));
+            String dql = String.format("SELECT * FROM %s where id = 7", 
tableName);
+            try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+                assertFalse(rs.next());
+                // since bloom filter is not enabled
+                expectedMetrics.eligibleRequestsCount +=1;
+                verifyBloomFilterMetrics(expectedMetrics);
+            }
+            ddl = String.format("ALTER TABLE %s SET BLOOMFILTER='ROW'", 
tableName);
+            conn.createStatement().execute(ddl);
+            // alter table changes the table descriptor and table region 
re-opens reset metrics
+            beforeMetrics = getBloomFilterMetrics();
+            expectedMetrics = new BloomFilterMetrics();
+            // Insert 2 more rows
+            String dml = String.format("UPSERT INTO %s VALUES (100, 
'val_100')", tableName);
+            conn.createStatement().execute(dml);
+            dml = String.format("UPSERT INTO %s VALUES (200, 'val_200')", 
tableName);
+            conn.createStatement().execute(dml);
+            conn.commit();
+            // A new storefile should be created with bloom filter
+            flush(getUtility(), TableName.valueOf(tableName));
+            dql = String.format("SELECT * FROM %s where id = 150", tableName);
+            try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+                assertFalse(rs.next());
+                expectedMetrics.requestsCount +=1;
+                expectedMetrics.negativeResultsCount +=1;
+                verifyBloomFilterMetrics(expectedMetrics);
+            }
+        }
+    }
+
+    private void verifyBloomFilterMetrics(BloomFilterMetrics expectedMetrics) 
throws InterruptedException {
+        long metricsDelay = 
config.getLong(HConstants.REGIONSERVER_METRICS_PERIOD,
+                HConstants.DEFAULT_REGIONSERVER_METRICS_PERIOD);
+        // Ensure that the region server accumulates the metrics
+        Thread.sleep(metricsDelay + 1000);
+        BloomFilterMetrics afterMetrics = getBloomFilterMetrics();
+        LOGGER.info("Before={} After={} Expected={}", beforeMetrics, 
afterMetrics, expectedMetrics);
+
+        BloomFilterMetrics deltaMetrics = new BloomFilterMetrics(
+                afterMetrics.requestsCount - beforeMetrics.requestsCount,
+                afterMetrics.negativeResultsCount - 
beforeMetrics.negativeResultsCount,
+                afterMetrics.eligibleRequestsCount - 
beforeMetrics.eligibleRequestsCount);
+
+        Assert.assertEquals(expectedMetrics, deltaMetrics);
+    }
+
+    private void populateTable(Connection conn, String tableName) throws 
SQLException {
+        try(PreparedStatement ps = conn.prepareStatement(String.format("UPSERT 
INTO %s VALUES (?, ?)", tableName))) {
+            for (int i = 0; i < 16; i = i + 2) {
+                ps.setInt(1, i);
+                ps.setString(2, "val_" + i);
+                ps.executeUpdate();
+            }
+            conn.commit();
+        }
+    }
+}
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
index 20bd097e04..c1584c19c5 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereOptimizerTest.java
@@ -2502,6 +2502,56 @@ public class WhereOptimizerTest extends 
BaseConnectionlessQueryTest {
         assertArrayEquals(stopRow, scan.getStopRow());
     }
 
+    @Test
+    public void testScanRangeForPointLookup() throws SQLException {
+        String tenantId = "000000000000001";
+        String entityId = "002333333333333";
+        String query = String.format("select * from atable where 
organization_id='%s' and entity_id='%s'",
+                tenantId, entityId);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            QueryPlan optimizedPlan = TestUtil.getOptimizeQueryPlan(conn, 
query);
+            byte[] startRow = 
ByteUtil.concat(PVarchar.INSTANCE.toBytes(tenantId), 
PVarchar.INSTANCE.toBytes(entityId));
+            byte[] stopRow =  ByteUtil.nextKey(startRow);
+            validateScanRangesForPointLookup(optimizedPlan, startRow, stopRow);
+        }
+    }
+
+    @Test
+    public void testScanRangeForPointLookupRVC() throws SQLException {
+        String tenantId = "000000000000001";
+        String entityId = "002333333333333";
+        String query = String.format("select * from atable where 
(organization_id, entity_id) IN (('%s','%s'))",
+                tenantId, entityId);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            QueryPlan optimizedPlan = TestUtil.getOptimizeQueryPlan(conn, 
query);
+            byte[] startRow = 
ByteUtil.concat(PVarchar.INSTANCE.toBytes(tenantId), 
PVarchar.INSTANCE.toBytes(entityId));
+            byte[] stopRow =  ByteUtil.nextKey(startRow);
+            validateScanRangesForPointLookup(optimizedPlan, startRow, stopRow);
+        }
+    }
+
+    private static void validateScanRangesForPointLookup(QueryPlan 
optimizedPlan, byte[] startRow, byte[] stopRow) {
+        StatementContext context = optimizedPlan.getContext();
+        ScanRanges scanRanges = context.getScanRanges();
+        assertTrue(scanRanges.isPointLookup());
+        assertEquals(1, scanRanges.getPointLookupCount());
+        // scan from StatementContext has scan range [start, next(start)]
+        Scan scanFromContext = context.getScan();
+        assertArrayEquals(startRow, scanFromContext.getStartRow());
+        assertTrue(scanFromContext.includeStartRow());
+        assertArrayEquals(stopRow, scanFromContext.getStopRow());
+        assertFalse(scanFromContext.includeStopRow());
+
+        List<List<Scan>> scans = optimizedPlan.getScans();
+        assertEquals(1, scans.size());
+        assertEquals(1, scans.get(0).size());
+        Scan scanFromIterator = scans.get(0).get(0);
+        // scan from iterator has same start and stop row [start, start] i.e a 
Get
+        assertTrue(scanFromIterator.isGetScan());
+        assertTrue(scanFromIterator.includeStartRow());
+        assertTrue(scanFromIterator.includeStopRow());
+    }
+
     private static StatementContext compileStatementTenantSpecific(String 
tenantId, String query, List<Object> binds) throws Exception {
        PhoenixConnection pconn = 
getTenantSpecificConnection("tenantId").unwrap(PhoenixConnection.class);
         PhoenixPreparedStatement pstmt = new PhoenixPreparedStatement(pconn, 
query);
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index fe0d8d1b2c..cf076f08ef 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -810,6 +810,11 @@ public class TestUtil {
         conn.createStatement().execute(ddl);
     }
 
+    public static void flush(HBaseTestingUtility utility, TableName table) 
throws IOException {
+        Admin admin = utility.getAdmin();
+        admin.flush(table);
+    }
+
     public static void majorCompact(HBaseTestingUtility utility, TableName 
table)
         throws IOException, InterruptedException {
         long compactionRequestedSCN = 
EnvironmentEdgeManager.currentTimeMillis();

Reply via email to