This is an automated email from the ASF dual-hosted git repository.

sanjeet pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.2 by this push:
     new c2a7351595 PHOENIX-7619 Excess HFiles are being read to look for more 
than required column versions (#2184)
c2a7351595 is described below

commit c2a73515958eaf8141c4c568f29b71c497ace27f
Author: sanjeet006py <[email protected]>
AuthorDate: Wed Jun 11 21:37:25 2025 +0530

    PHOENIX-7619 Excess HFiles are being read to look for more than required 
column versions (#2184)
---
 .../coprocessor/BaseScannerRegionObserver.java     |   3 +-
 phoenix-core/pom.xml                               |  10 +
 .../phoenix/end2end/PreStoreScannerOpenIT.java     | 260 +++++++++++++++++++++
 pom.xml                                            |  11 +
 4 files changed, 283 insertions(+), 1 deletion(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index d893f7f455..00d33304b3 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -421,7 +421,8 @@ abstract public class BaseScannerRegionObserver implements 
RegionObserver {
 
         Configuration conf = c.getEnvironment().getConfiguration();
         if (isPhoenixTableTTLEnabled(conf)) {
-            setScanOptionsForFlushesAndCompactions(options);
+            options.setKeepDeletedCells(KeepDeletedCells.TTL);
+            options.setTTL(Long.MAX_VALUE);
             return;
         }
         if (!storeFileScanDoesntNeedAlteration(options)) {
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 25865e9203..73866373aa 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -449,6 +449,16 @@
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>net.bytebuddy</groupId>
+            <artifactId>byte-buddy</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>net.bytebuddy</groupId>
+            <artifactId>byte-buddy-agent</artifactId>
+            <scope>test</scope>
+        </dependency>
 
         <!-- logging -->
         <dependency>
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PreStoreScannerOpenIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PreStoreScannerOpenIT.java
new file mode 100644
index 0000000000..1da0c89c2a
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PreStoreScannerOpenIT.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.CompactSplit;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ManualEnvironmentEdge;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.agent.ByteBuddyAgent;
+import net.bytebuddy.asm.Advice;
+import net.bytebuddy.dynamic.loading.ClassReloadingStrategy;
+import net.bytebuddy.matcher.ElementMatchers;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class PreStoreScannerOpenIT extends BaseTest {
+
+    private static final int TTL_IN_DAYS = 30;
+    // The table name is used to filter the seek/reseek calls
+    public static final String TABLE_NAME = generateUniqueName();
+
+    // Counters for tracking seek/reseek calls
+    public static final AtomicLong SEEK_COUNT = new AtomicLong(0);
+    public static final AtomicLong RESEEK_COUNT = new AtomicLong(0);
+
+    // ByteBuddy advice classes for intercepting method calls
+    public static class SeekAdvice {
+        @Advice.OnMethodEnter
+        public static void onSeek(@Advice.This StoreFileScanner scanner) {
+            if (scanner.getFilePath().toString().contains(TABLE_NAME)) {
+                SEEK_COUNT.incrementAndGet();
+                System.out.println("SEEK called! Total seeks: " + 
SEEK_COUNT.get());
+            }
+        }
+    }
+
+    public static class ReseekAdvice {
+        @Advice.OnMethodEnter
+        public static void onReseek(@Advice.This StoreFileScanner scanner) {
+            if (scanner.getFilePath().toString().contains(TABLE_NAME)) {
+                RESEEK_COUNT.incrementAndGet();
+                System.out.println("RESEEK called! Total reseeks: " + 
RESEEK_COUNT.get());
+            }
+        }
+    }
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        // Install ByteBuddy agent
+        ByteBuddyAgent.install();
+
+        // Instrument StoreFileScanner to track seek/reseek calls
+        new ByteBuddy().redefine(StoreFileScanner.class)
+            
.visit(Advice.to(SeekAdvice.class).on(ElementMatchers.named("seek")))
+            
.visit(Advice.to(ReseekAdvice.class).on(ElementMatchers.named("reseek"))).make()
+            .load(StoreFileScanner.class.getClassLoader(),
+                ClassReloadingStrategy.fromInstalledAgent());
+
+        System.out.println("ByteBuddy instrumentation installed for 
StoreFileScanner");
+
+        Map<String, String> props = new HashMap<>();
+        // Disable periodic flushes
+        props.put(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, "0");
+        // Disable compactions
+        props.put(CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION, "false");
+        props.put("hbase.procedure.remote.dispatcher.delay.msec", "0");
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    /**
+     * Test that the store scanner does not do extra seeks when scanning a 
table with Phoenix TTL
+     * enabled. Setting min and max versions to > 1 in
+     * {@link BaseScannerRegionObserver#preStoreScannerOpen} will make 
StoreScanner to do extra
+     * seeks.
+     * @throws Exception
+     */
+    @Test
+    public void testStoreScannerNotDoingExtraSeeks() throws Exception {
+        createTable(TABLE_NAME);
+
+        try (Connection conn = DriverManager.getConnection(getUrl());
+            Statement stmt = conn.createStatement()) {
+            stmt.execute("UPSERT INTO " + TABLE_NAME + " (id, col1) VALUES (1, 
'a')");
+            stmt.execute("UPSERT INTO " + TABLE_NAME + " (id, col1) VALUES 
(10, 'b')");
+            conn.commit();
+            getUtility().flush(TableName.valueOf(TABLE_NAME));
+
+            stmt.execute("UPSERT INTO " + TABLE_NAME + " (id, col1) VALUES (2, 
'c')");
+            stmt.execute("UPSERT INTO " + TABLE_NAME + " (id, col1) VALUES (9, 
'd')");
+            conn.commit();
+            getUtility().flush(TableName.valueOf(TABLE_NAME));
+
+            stmt.execute("UPSERT INTO " + TABLE_NAME + " (id, col1) VALUES (3, 
'e')");
+            conn.commit();
+            // The newly inserted row will be in memstore only
+        }
+
+        Assert.assertEquals(2, getStoreFileCount(TABLE_NAME));
+
+        try (Connection conn = DriverManager.getConnection(getUrl());
+            Statement stmt = conn.createStatement()) {
+            ResultSet rs = stmt.executeQuery("SELECT col1 FROM " + TABLE_NAME 
+ " WHERE id = 3");
+            Assert.assertTrue(rs.next());
+            Assert.assertFalse(rs.next());
+        }
+        Assert.assertEquals(0, SEEK_COUNT.get());
+        Assert.assertEquals(0, RESEEK_COUNT.get());
+    }
+
+    /**
+     * Test SCN queries can see deleted rows. Setting keepDeletedCells to a
+     * {@link KeepDeletedCells.FALSE} in {@link 
BaseScannerRegionObserver#preStoreScannerOpen} will
+     * make StoreScanner to not return deleted rows for SCN queries.
+     * @throws Exception
+     */
+    @Test
+    public void testSCNScansCanSeeDeletedRows() throws Exception {
+        String tableName = generateUniqueName();
+        createTable(tableName);
+
+        long beforeRowWasDeleted;
+        try (Connection conn = DriverManager.getConnection(getUrl());
+            Statement stmt = conn.createStatement()) {
+            stmt.execute("UPSERT INTO " + tableName + " (id, col1) VALUES (1, 
'a')");
+            conn.commit();
+            getUtility().flush(TableName.valueOf(tableName));
+
+            beforeRowWasDeleted = EnvironmentEdgeManager.currentTimeMillis();
+            // Sleep for 1ms for timestamps to be different for delete and 
insert
+            Thread.sleep(1);
+
+            stmt.execute("DELETE FROM " + tableName + " WHERE id = 1");
+            conn.commit();
+            getUtility().flush(TableName.valueOf(tableName));
+
+            Thread.sleep(1);
+
+            // Check that the row is not visible for non-SCN queries
+            ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName + " 
WHERE id = 1");
+            Assert.assertFalse(rs.next());
+        }
+
+        Properties props = new Properties();
+        props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(beforeRowWasDeleted));
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+            Statement stmt = conn.createStatement()) {
+            ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName + " 
WHERE id = 1");
+            Assert.assertTrue(rs.next());
+            Assert.assertFalse(rs.next());
+        }
+    }
+
+    /**
+     * Test that rows are not expired partially. Setting TTL to {@link 
HConstants.FOREVER} in
+     * {@link BaseScannerRegionObserver#preStoreScannerOpen} will make 
StoreScanner to not return
+     * cells that are expired for HBase leading rows to expire partially.
+     * @throws Exception
+     */
+    @Test
+    public void testRowsAreNotExpiredPartially() throws Exception {
+        String tableName = generateUniqueName();
+        createTable(tableName);
+
+        ManualEnvironmentEdge injectEdge = new ManualEnvironmentEdge();
+        injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
+        EnvironmentEdgeManager.injectEdge(injectEdge);
+
+        try (Connection conn = DriverManager.getConnection(getUrl());
+            Statement stmt = conn.createStatement()) {
+            stmt.execute("UPSERT INTO " + tableName + " (id, col1, col2) 
VALUES (1, 'a', 'ab')");
+            conn.commit();
+
+            injectEdge.incrementValue(1);
+            getUtility().flush(TableName.valueOf(tableName));
+
+            Assert.assertTrue(HConstants.FOREVER / (24 * 60 * 60 * 1000) < 
TTL_IN_DAYS);
+            injectEdge.incrementValue(HConstants.FOREVER);
+
+            stmt.execute("UPSERT INTO " + tableName + " (id, col1) VALUES (1, 
'b')");
+            conn.commit();
+
+            injectEdge.incrementValue(1);
+
+            ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName + " 
WHERE id = 1");
+            while (rs.next()) {
+                Assert.assertEquals(1, rs.getInt("id"));
+                Assert.assertEquals("b", rs.getString("col1"));
+                Assert.assertEquals("ab", rs.getString("col2"));
+            }
+        } finally {
+            EnvironmentEdgeManager.reset();
+        }
+    }
+
+    private void createTable(String tableName) throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl());
+            Statement stmt = conn.createStatement()) {
+            stmt.execute("CREATE TABLE " + tableName
+                + " (id INTEGER PRIMARY KEY, col1 VARCHAR, col2 VARCHAR) 
BLOOMFILTER = NONE, TTL = "
+                + TTL_IN_DAYS * 24 * 60 * 60);
+            conn.commit();
+        }
+    }
+
+    private int getStoreFileCount(String tableName) throws Exception {
+        TableName table = TableName.valueOf(tableName);
+        int totalStoreFiles = 0;
+
+        // Get all regions for the table
+        List<HRegion> regions = 
getUtility().getHBaseCluster().getRegionServerThreads().stream()
+            .flatMap(rs -> rs.getRegionServer().getRegions(table).stream())
+            .collect(Collectors.toList());
+
+        // Count StoreFiles in each region
+        for (HRegion region : regions) {
+            for (HStore store : region.getStores()) {
+                totalStoreFiles += store.getStorefilesCount();
+            }
+        }
+        return totalStoreFiles;
+    }
+}
diff --git a/pom.xml b/pom.xml
index 3bc319b252..80d8af7fec 100644
--- a/pom.xml
+++ b/pom.xml
@@ -140,6 +140,7 @@
     <mockito.version>4.11.0</mockito.version>
     <junit.version>4.13.1</junit.version>
     <hdrhistogram.version>2.1.12</hdrhistogram.version>
+    <byte-buddy.version>1.15.11</byte-buddy.version>
 
     <!-- These are only used for exclusion when shading, and the exact version 
is completely
     irrelevant, but we need to keep them up to date to appease static 
checkers. While Phoenix does
@@ -1950,6 +1951,16 @@
         <type>pom</type>
         <scope>import</scope>
       </dependency>
+      <dependency>
+        <groupId>net.bytebuddy</groupId>
+        <artifactId>byte-buddy</artifactId>
+        <version>${byte-buddy.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>net.bytebuddy</groupId>
+        <artifactId>byte-buddy-agent</artifactId>
+        <version>${byte-buddy.version}</version>
+      </dependency>
 
        <!-- Runtime dependencies -->
       <dependency>

Reply via email to