This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c554d9dc4e [core] Refactor limit pushdown in KeyValueFileStoreScan
c554d9dc4e is described below

commit c554d9dc4e102a490cc5cadb863b95b972aef39d
Author: JingsongLi <[email protected]>
AuthorDate: Fri Jan 16 15:18:15 2026 +0800

    [core] Refactor limit pushdown in KeyValueFileStoreScan
---
 .../paimon/operation/AbstractFileStoreScan.java    |   8 -
 .../paimon/operation/AppendOnlyFileStoreScan.java  |  80 +--------
 .../operation/DataEvolutionFileStoreScan.java      |   5 -
 .../paimon/operation/KeyValueFileStoreScan.java    | 190 +++++----------------
 .../operation/KeyValueFileStoreScanTest.java       |   6 +-
 5 files changed, 53 insertions(+), 236 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index fdefcd0d35..aa3f1ccf7a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -444,14 +444,6 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         return false;
     }
 
-    protected boolean supportsLimitPushManifestEntries() {
-        return false;
-    }
-
-    protected Iterator<ManifestEntry> 
limitPushManifestEntries(Iterator<ManifestEntry> entries) {
-        throw new UnsupportedOperationException();
-    }
-
     protected List<ManifestEntry> 
postFilterManifestEntries(List<ManifestEntry> entries) {
         throw new UnsupportedOperationException();
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index bf7504a5fb..49071045be 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -34,7 +34,6 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -88,25 +87,22 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
         return this;
     }
 
-    @Override
-    public boolean supportsLimitPushManifestEntries() {
-        return limit != null && limit > 0 && !deletionVectorsEnabled;
-    }
-
     @Override
     protected boolean postFilterManifestEntriesEnabled() {
-        return supportsLimitPushManifestEntries();
+        return limit != null && limit > 0 && !deletionVectorsEnabled;
     }
 
     @Override
     protected List<ManifestEntry> 
postFilterManifestEntries(List<ManifestEntry> entries) {
         checkArgument(limit != null && limit > 0 && !deletionVectorsEnabled);
-        // Use LimitAwareManifestEntryIterator for limit pushdown
-        Iterator<ManifestEntry> iterator =
-                new LimitAwareManifestEntryIterator(entries.iterator(), limit);
         List<ManifestEntry> result = new ArrayList<>();
-        while (iterator.hasNext()) {
-            result.add(iterator.next());
+        long accumulatedRowCount = 0;
+        for (ManifestEntry entry : entries) {
+            result.add(entry);
+            accumulatedRowCount += entry.file().rowCount();
+            if (accumulatedRowCount >= limit) {
+                break;
+            }
         }
         return result;
     }
@@ -173,64 +169,4 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
             throw new RuntimeException("Exception happens while checking 
predicate.", e);
         }
     }
-
-    /**
-     * Iterator that applies limit pushdown by stopping early when enough rows 
have been
-     * accumulated.
-     */
-    private static class LimitAwareManifestEntryIterator implements 
Iterator<ManifestEntry> {
-        private final Iterator<ManifestEntry> baseIterator;
-        private final long limit;
-
-        private long accumulatedRowCount = 0;
-        private ManifestEntry nextEntry = null;
-        private boolean hasNext = false;
-
-        LimitAwareManifestEntryIterator(Iterator<ManifestEntry> baseIterator, 
long limit) {
-            this.baseIterator = baseIterator;
-            this.limit = limit;
-            advance();
-        }
-
-        private void advance() {
-            // If we've already accumulated enough rows, stop reading more 
entries
-            if (accumulatedRowCount >= limit) {
-                hasNext = false;
-                nextEntry = null;
-                return;
-            }
-
-            if (baseIterator.hasNext()) {
-                nextEntry = baseIterator.next();
-                hasNext = true;
-
-                long fileRowCount = nextEntry.file().rowCount();
-                if (fileRowCount > 0) {
-                    accumulatedRowCount += fileRowCount;
-                }
-
-                return;
-            }
-
-            // No more base entries
-            hasNext = false;
-            nextEntry = null;
-        }
-
-        @Override
-        public boolean hasNext() {
-            return hasNext;
-        }
-
-        @Override
-        public ManifestEntry next() {
-            // This exception is only thrown if next() is called when 
hasNext() returns false.
-            if (!hasNext) {
-                throw new java.util.NoSuchElementException();
-            }
-            ManifestEntry current = nextEntry;
-            advance();
-            return current;
-        }
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
index c2695ce682..db9e5db807 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
@@ -116,11 +116,6 @@ public class DataEvolutionFileStoreScan extends 
AppendOnlyFileStoreScan {
         return this;
     }
 
-    @Override
-    public boolean supportsLimitPushManifestEntries() {
-        return false;
-    }
-
     @Override
     protected boolean postFilterManifestEntriesEnabled() {
         return inputFilter != null;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index 2ac05b7385..a09fb25ee6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -21,6 +21,7 @@ package org.apache.paimon.operation;
 import org.apache.paimon.CoreOptions.ChangelogProducer;
 import org.apache.paimon.CoreOptions.MergeEngine;
 import org.apache.paimon.KeyValueFileStore;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.fileindex.FileIndexPredicate;
 import org.apache.paimon.io.DataFileMeta;
@@ -38,9 +39,6 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SnapshotManager;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -50,7 +48,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Function;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.MergeEngine.AGGREGATE;
@@ -59,8 +57,6 @@ import static 
org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
 /** {@link FileStoreScan} for {@link KeyValueFileStore}. */
 public class KeyValueFileStoreScan extends AbstractFileStoreScan {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(KeyValueFileStoreScan.class);
-
     private final SimpleStatsEvolutions fieldKeyStatsConverters;
     private final SimpleStatsEvolutions fieldValueStatsConverters;
     private final BucketSelectConverter bucketSelectConverter;
@@ -209,168 +205,66 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
         }
     }
 
-    /**
-     * Check if limit pushdown is supported for PK tables.
-     *
-     * <p>Not supported when merge engine is PARTIAL_UPDATE/AGGREGATE (need 
merge) or deletion
-     * vectors are enabled (can't count deleted rows). For 
DEDUPLICATE/FIRST_ROW, per-bucket checks
-     * (no overlapping, no delete rows) are done in 
applyLimitPushdownForBucket.
-     */
     @Override
-    public boolean supportsLimitPushManifestEntries() {
-        if (mergeEngine == PARTIAL_UPDATE || mergeEngine == AGGREGATE) {
-            return false;
-        }
-
-        return limit != null && limit > 0 && !deletionVectorsEnabled;
+    protected boolean postFilterManifestEntriesEnabled() {
+        return wholeBucketFilterEnabled() || limitPushdownEnabled();
     }
 
-    /**
-     * Apply limit pushdown for a single bucket. Returns files to include, or 
null if unsafe.
-     *
-     * <p>Returns null if files overlap (LSM level 0 or different levels) or 
have delete rows. For
-     * non-overlapping files with no delete rows, accumulates row counts until 
limit is reached.
-     *
-     * @param bucketEntries files in the same bucket
-     * @param limit the limit to apply
-     * @return files to include, or null if we can't safely push down limit
-     */
-    @Nullable
-    private List<ManifestEntry> applyLimitPushdownForBucket(
-            List<ManifestEntry> bucketEntries, long limit) {
-        // Check if this bucket has overlapping files (LSM property)
-        boolean hasOverlapping = !noOverlapping(bucketEntries);
-
-        if (hasOverlapping) {
-            // For buckets with overlapping, we can't safely push down limit 
because files
-            // need to be merged and we can't accurately calculate the merged 
row count.
-            return null;
-        }
-
-        // For buckets without overlapping and with merge engines that don't 
require
-        // merge (DEDUPLICATE or FIRST_ROW), we can safely accumulate row count
-        // and stop when limit is reached, but only if files have no delete 
rows.
-        List<ManifestEntry> result = new ArrayList<>();
-        long accumulatedRowCount = 0;
-
-        for (ManifestEntry entry : bucketEntries) {
-            long fileRowCount = entry.file().rowCount();
-            // Check if file has delete rows - if so, we can't accurately 
calculate
-            // the merged row count, so we need to stop limit pushdown
-            boolean hasDeleteRows =
-                    entry.file().deleteRowCount().map(count -> count > 
0L).orElse(false);
-
-            if (hasDeleteRows) {
-                // If file has delete rows, we can't accurately calculate 
merged row count
-                // without reading the actual data. Can't safely push down 
limit.
-                return null;
-            }
+    private boolean wholeBucketFilterEnabled() {
+        return valueFilter != null && scanMode == ScanMode.ALL;
+    }
 
-            // File has no delete rows, no overlapping, and merge engine 
doesn't require merge.
-            // Safe to count rows.
-            result.add(entry);
-            accumulatedRowCount += fileRowCount;
-            if (accumulatedRowCount >= limit) {
-                break;
-            }
+    @VisibleForTesting
+    public boolean limitPushdownEnabled() {
+        if (limit == null || limit <= 0) {
+            return false;
         }
 
-        return result;
-    }
-
-    @Override
-    protected boolean postFilterManifestEntriesEnabled() {
-        return (valueFilter != null && scanMode == ScanMode.ALL)
-                || supportsLimitPushManifestEntries();
+        return mergeEngine != PARTIAL_UPDATE && mergeEngine != AGGREGATE && 
!deletionVectorsEnabled;
     }
 
     @Override
     protected List<ManifestEntry> 
postFilterManifestEntries(List<ManifestEntry> files) {
-        long startTime = System.nanoTime();
         Map<Pair<BinaryRow, Integer>, List<ManifestEntry>> buckets = 
groupByBucket(files);
-
-        // Apply filter if valueFilter is enabled, otherwise use identity 
function
-        Function<List<ManifestEntry>, List<ManifestEntry>> bucketProcessor =
-                (valueFilter != null && scanMode == ScanMode.ALL)
-                        ? this::doFilterWholeBucketByStats
-                        : Function.identity();
-
-        // Apply filter (if enabled) and limit pushdown (if enabled)
-        boolean limitEnabled = supportsLimitPushManifestEntries();
-        List<ManifestEntry> result =
-                applyLimitPushdownToBuckets(buckets, bucketProcessor, 
limitEnabled);
-
-        if (limitEnabled) {
-            long duration = (System.nanoTime() - startTime) / 1_000_000;
-            LOG.info(
-                    "Limit pushdown for PK table completed in {} ms. Limit: 
{}, InputFiles: {}, OutputFiles: {}, "
-                            + "MergeEngine: {}, ScanMode: {}, 
DeletionVectorsEnabled: {}",
-                    duration,
-                    limit,
-                    files.size(),
-                    result.size(),
-                    mergeEngine,
-                    scanMode,
-                    deletionVectorsEnabled);
-        }
-
-        return result;
-    }
-
-    /**
-     * Apply limit pushdown to buckets with an optional bucket processor 
(e.g., filtering).
-     *
-     * <p>This method processes buckets in order, applying the bucket 
processor first, then applying
-     * limit pushdown if enabled. It stops early when the limit is reached.
-     *
-     * @param buckets buckets grouped by (partition, bucket)
-     * @param bucketProcessor processor to apply to each bucket before limit 
pushdown
-     * @return processed entries (filtered and limited if limit is enabled)
-     */
-    private List<ManifestEntry> applyLimitPushdownToBuckets(
-            Map<Pair<BinaryRow, Integer>, List<ManifestEntry>> buckets,
-            Function<List<ManifestEntry>, List<ManifestEntry>> bucketProcessor,
-            boolean limitEnabled) {
         List<ManifestEntry> result = new ArrayList<>();
-        long accumulatedRowCount = 0;
-
-        for (List<ManifestEntry> bucketEntries : buckets.values()) {
-            // Apply bucket processor (e.g., filtering)
-            List<ManifestEntry> processed = 
bucketProcessor.apply(bucketEntries);
+        AtomicLong currentRowCount = new AtomicLong(0);
+        for (List<ManifestEntry> entries : buckets.values()) {
+            boolean noOverlapping = noOverlapping(entries);
+            if (wholeBucketFilterEnabled()) {
+                entries =
+                        noOverlapping
+                                ? filterWholeBucketPerFile(entries)
+                                : filterWholeBucketAllFiles(entries);
+            }
 
-            if (limitEnabled) {
-                // Apply limit pushdown if enabled
-                if (accumulatedRowCount >= limit) {
-                    // Already reached limit, stop processing remaining buckets
+            if (limitPushdownEnabled() && noOverlapping) {
+                if (currentRowCount.get() >= limit) {
                     break;
                 }
-
-                long remainingLimit = limit - accumulatedRowCount;
-                List<ManifestEntry> processedBucket =
-                        applyLimitPushdownForBucket(processed, remainingLimit);
-                if (processedBucket == null) {
-                    // Can't safely push down limit for this bucket, include 
all processed entries
-                    result.addAll(processed);
-                } else {
-                    result.addAll(processedBucket);
-                    for (ManifestEntry entry : processedBucket) {
-                        long fileRowCount = entry.file().rowCount();
-                        accumulatedRowCount += fileRowCount;
-                    }
-                }
-            } else {
-                // No limit pushdown, just add processed entries
-                result.addAll(processed);
+                entries = applyLimitWhenNoOverlapping(entries, 
currentRowCount);
             }
+            result.addAll(entries);
         }
-
         return result;
     }
 
-    private List<ManifestEntry> doFilterWholeBucketByStats(List<ManifestEntry> 
entries) {
-        return noOverlapping(entries)
-                ? filterWholeBucketPerFile(entries)
-                : filterWholeBucketAllFiles(entries);
+    private List<ManifestEntry> applyLimitWhenNoOverlapping(
+            List<ManifestEntry> entries, AtomicLong currentRowCount) {
+        List<ManifestEntry> result = new ArrayList<>();
+        for (ManifestEntry entry : entries) {
+            boolean hasDeleteRows =
+                    entry.file().deleteRowCount().map(count -> count > 
0L).orElse(false);
+            if (hasDeleteRows) {
+                return entries;
+            }
+            result.add(entry);
+            long fileRowCount = entry.file().rowCount();
+            currentRowCount.addAndGet(fileRowCount);
+            if (currentRowCount.get() >= limit) {
+                break;
+            }
+        }
+        return result;
     }
 
     private List<ManifestEntry> filterWholeBucketPerFile(List<ManifestEntry> 
entries) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
index f975cf570d..9a9c290304 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
@@ -446,7 +446,7 @@ public class KeyValueFileStoreScanTest {
         KeyValueFileStoreScan scan = storePartialUpdate.newScan();
         scan.withSnapshot(snapshot.id()).withLimit(10);
         // supportsLimitPushManifestEntries should return false for 
PARTIAL_UPDATE
-        assertThat(scan.supportsLimitPushManifestEntries()).isFalse();
+        assertThat(scan.limitPushdownEnabled()).isFalse();
 
         // Should read all files since limit pushdown is disabled
         KeyValueFileStoreScan scanWithoutLimit = storePartialUpdate.newScan();
@@ -494,7 +494,7 @@ public class KeyValueFileStoreScanTest {
         KeyValueFileStoreScan scan = storeAggregate.newScan();
         scan.withSnapshot(snapshot.id()).withLimit(10);
         // supportsLimitPushManifestEntries should return false for AGGREGATE
-        assertThat(scan.supportsLimitPushManifestEntries()).isFalse();
+        assertThat(scan.limitPushdownEnabled()).isFalse();
 
         // Should read all files since limit pushdown is disabled
         KeyValueFileStoreScan scanWithoutLimit = storeAggregate.newScan();
@@ -538,7 +538,7 @@ public class KeyValueFileStoreScanTest {
         KeyValueFileStoreScan scan = storeWithDV.newScan();
         scan.withLimit(10);
         // supportsLimitPushManifestEntries should return false when deletion 
vectors are enabled
-        assertThat(scan.supportsLimitPushManifestEntries()).isFalse();
+        assertThat(scan.limitPushdownEnabled()).isFalse();
     }
 
     private void runTestExactMatch(

Reply via email to