This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c554d9dc4e [core] Refactor limit pushdown in KeyValueFileStoreScan
c554d9dc4e is described below
commit c554d9dc4e102a490cc5cadb863b95b972aef39d
Author: JingsongLi <[email protected]>
AuthorDate: Fri Jan 16 15:18:15 2026 +0800
[core] Refactor limit pushdown in KeyValueFileStoreScan
---
.../paimon/operation/AbstractFileStoreScan.java | 8 -
.../paimon/operation/AppendOnlyFileStoreScan.java | 80 +--------
.../operation/DataEvolutionFileStoreScan.java | 5 -
.../paimon/operation/KeyValueFileStoreScan.java | 190 +++++----------------
.../operation/KeyValueFileStoreScanTest.java | 6 +-
5 files changed, 53 insertions(+), 236 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index fdefcd0d35..aa3f1ccf7a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -444,14 +444,6 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
return false;
}
- protected boolean supportsLimitPushManifestEntries() {
- return false;
- }
-
- protected Iterator<ManifestEntry>
limitPushManifestEntries(Iterator<ManifestEntry> entries) {
- throw new UnsupportedOperationException();
- }
-
protected List<ManifestEntry>
postFilterManifestEntries(List<ManifestEntry> entries) {
throw new UnsupportedOperationException();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index bf7504a5fb..49071045be 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -34,7 +34,6 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -88,25 +87,22 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
return this;
}
- @Override
- public boolean supportsLimitPushManifestEntries() {
- return limit != null && limit > 0 && !deletionVectorsEnabled;
- }
-
@Override
protected boolean postFilterManifestEntriesEnabled() {
- return supportsLimitPushManifestEntries();
+ return limit != null && limit > 0 && !deletionVectorsEnabled;
}
@Override
protected List<ManifestEntry>
postFilterManifestEntries(List<ManifestEntry> entries) {
checkArgument(limit != null && limit > 0 && !deletionVectorsEnabled);
- // Use LimitAwareManifestEntryIterator for limit pushdown
- Iterator<ManifestEntry> iterator =
- new LimitAwareManifestEntryIterator(entries.iterator(), limit);
List<ManifestEntry> result = new ArrayList<>();
- while (iterator.hasNext()) {
- result.add(iterator.next());
+ long accumulatedRowCount = 0;
+ for (ManifestEntry entry : entries) {
+ result.add(entry);
+ accumulatedRowCount += entry.file().rowCount();
+ if (accumulatedRowCount >= limit) {
+ break;
+ }
}
return result;
}
@@ -173,64 +169,4 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
throw new RuntimeException("Exception happens while checking
predicate.", e);
}
}
-
- /**
- * Iterator that applies limit pushdown by stopping early when enough rows
have been
- * accumulated.
- */
- private static class LimitAwareManifestEntryIterator implements
Iterator<ManifestEntry> {
- private final Iterator<ManifestEntry> baseIterator;
- private final long limit;
-
- private long accumulatedRowCount = 0;
- private ManifestEntry nextEntry = null;
- private boolean hasNext = false;
-
- LimitAwareManifestEntryIterator(Iterator<ManifestEntry> baseIterator,
long limit) {
- this.baseIterator = baseIterator;
- this.limit = limit;
- advance();
- }
-
- private void advance() {
- // If we've already accumulated enough rows, stop reading more
entries
- if (accumulatedRowCount >= limit) {
- hasNext = false;
- nextEntry = null;
- return;
- }
-
- if (baseIterator.hasNext()) {
- nextEntry = baseIterator.next();
- hasNext = true;
-
- long fileRowCount = nextEntry.file().rowCount();
- if (fileRowCount > 0) {
- accumulatedRowCount += fileRowCount;
- }
-
- return;
- }
-
- // No more base entries
- hasNext = false;
- nextEntry = null;
- }
-
- @Override
- public boolean hasNext() {
- return hasNext;
- }
-
- @Override
- public ManifestEntry next() {
- // This exception is only thrown if next() is called when
hasNext() returns false.
- if (!hasNext) {
- throw new java.util.NoSuchElementException();
- }
- ManifestEntry current = nextEntry;
- advance();
- return current;
- }
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
index c2695ce682..db9e5db807 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
@@ -116,11 +116,6 @@ public class DataEvolutionFileStoreScan extends
AppendOnlyFileStoreScan {
return this;
}
- @Override
- public boolean supportsLimitPushManifestEntries() {
- return false;
- }
-
@Override
protected boolean postFilterManifestEntriesEnabled() {
return inputFilter != null;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index 2ac05b7385..a09fb25ee6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -21,6 +21,7 @@ package org.apache.paimon.operation;
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.KeyValueFileStore;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fileindex.FileIndexPredicate;
import org.apache.paimon.io.DataFileMeta;
@@ -38,9 +39,6 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import javax.annotation.Nullable;
import java.io.IOException;
@@ -50,7 +48,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Function;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.MergeEngine.AGGREGATE;
@@ -59,8 +57,6 @@ import static
org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
/** {@link FileStoreScan} for {@link KeyValueFileStore}. */
public class KeyValueFileStoreScan extends AbstractFileStoreScan {
- private static final Logger LOG =
LoggerFactory.getLogger(KeyValueFileStoreScan.class);
-
private final SimpleStatsEvolutions fieldKeyStatsConverters;
private final SimpleStatsEvolutions fieldValueStatsConverters;
private final BucketSelectConverter bucketSelectConverter;
@@ -209,168 +205,66 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
}
}
- /**
- * Check if limit pushdown is supported for PK tables.
- *
- * <p>Not supported when merge engine is PARTIAL_UPDATE/AGGREGATE (need
merge) or deletion
- * vectors are enabled (can't count deleted rows). For
DEDUPLICATE/FIRST_ROW, per-bucket checks
- * (no overlapping, no delete rows) are done in
applyLimitPushdownForBucket.
- */
@Override
- public boolean supportsLimitPushManifestEntries() {
- if (mergeEngine == PARTIAL_UPDATE || mergeEngine == AGGREGATE) {
- return false;
- }
-
- return limit != null && limit > 0 && !deletionVectorsEnabled;
+ protected boolean postFilterManifestEntriesEnabled() {
+ return wholeBucketFilterEnabled() || limitPushdownEnabled();
}
- /**
- * Apply limit pushdown for a single bucket. Returns files to include, or
null if unsafe.
- *
- * <p>Returns null if files overlap (LSM level 0 or different levels) or
have delete rows. For
- * non-overlapping files with no delete rows, accumulates row counts until
limit is reached.
- *
- * @param bucketEntries files in the same bucket
- * @param limit the limit to apply
- * @return files to include, or null if we can't safely push down limit
- */
- @Nullable
- private List<ManifestEntry> applyLimitPushdownForBucket(
- List<ManifestEntry> bucketEntries, long limit) {
- // Check if this bucket has overlapping files (LSM property)
- boolean hasOverlapping = !noOverlapping(bucketEntries);
-
- if (hasOverlapping) {
- // For buckets with overlapping, we can't safely push down limit
because files
- // need to be merged and we can't accurately calculate the merged
row count.
- return null;
- }
-
- // For buckets without overlapping and with merge engines that don't
require
- // merge (DEDUPLICATE or FIRST_ROW), we can safely accumulate row count
- // and stop when limit is reached, but only if files have no delete
rows.
- List<ManifestEntry> result = new ArrayList<>();
- long accumulatedRowCount = 0;
-
- for (ManifestEntry entry : bucketEntries) {
- long fileRowCount = entry.file().rowCount();
- // Check if file has delete rows - if so, we can't accurately
calculate
- // the merged row count, so we need to stop limit pushdown
- boolean hasDeleteRows =
- entry.file().deleteRowCount().map(count -> count >
0L).orElse(false);
-
- if (hasDeleteRows) {
- // If file has delete rows, we can't accurately calculate
merged row count
- // without reading the actual data. Can't safely push down
limit.
- return null;
- }
+ private boolean wholeBucketFilterEnabled() {
+ return valueFilter != null && scanMode == ScanMode.ALL;
+ }
- // File has no delete rows, no overlapping, and merge engine
doesn't require merge.
- // Safe to count rows.
- result.add(entry);
- accumulatedRowCount += fileRowCount;
- if (accumulatedRowCount >= limit) {
- break;
- }
+ @VisibleForTesting
+ public boolean limitPushdownEnabled() {
+ if (limit == null || limit <= 0) {
+ return false;
}
- return result;
- }
-
- @Override
- protected boolean postFilterManifestEntriesEnabled() {
- return (valueFilter != null && scanMode == ScanMode.ALL)
- || supportsLimitPushManifestEntries();
+ return mergeEngine != PARTIAL_UPDATE && mergeEngine != AGGREGATE &&
!deletionVectorsEnabled;
}
@Override
protected List<ManifestEntry>
postFilterManifestEntries(List<ManifestEntry> files) {
- long startTime = System.nanoTime();
Map<Pair<BinaryRow, Integer>, List<ManifestEntry>> buckets =
groupByBucket(files);
-
- // Apply filter if valueFilter is enabled, otherwise use identity
function
- Function<List<ManifestEntry>, List<ManifestEntry>> bucketProcessor =
- (valueFilter != null && scanMode == ScanMode.ALL)
- ? this::doFilterWholeBucketByStats
- : Function.identity();
-
- // Apply filter (if enabled) and limit pushdown (if enabled)
- boolean limitEnabled = supportsLimitPushManifestEntries();
- List<ManifestEntry> result =
- applyLimitPushdownToBuckets(buckets, bucketProcessor,
limitEnabled);
-
- if (limitEnabled) {
- long duration = (System.nanoTime() - startTime) / 1_000_000;
- LOG.info(
- "Limit pushdown for PK table completed in {} ms. Limit:
{}, InputFiles: {}, OutputFiles: {}, "
- + "MergeEngine: {}, ScanMode: {},
DeletionVectorsEnabled: {}",
- duration,
- limit,
- files.size(),
- result.size(),
- mergeEngine,
- scanMode,
- deletionVectorsEnabled);
- }
-
- return result;
- }
-
- /**
- * Apply limit pushdown to buckets with an optional bucket processor
(e.g., filtering).
- *
- * <p>This method processes buckets in order, applying the bucket
processor first, then applying
- * limit pushdown if enabled. It stops early when the limit is reached.
- *
- * @param buckets buckets grouped by (partition, bucket)
- * @param bucketProcessor processor to apply to each bucket before limit
pushdown
- * @return processed entries (filtered and limited if limit is enabled)
- */
- private List<ManifestEntry> applyLimitPushdownToBuckets(
- Map<Pair<BinaryRow, Integer>, List<ManifestEntry>> buckets,
- Function<List<ManifestEntry>, List<ManifestEntry>> bucketProcessor,
- boolean limitEnabled) {
List<ManifestEntry> result = new ArrayList<>();
- long accumulatedRowCount = 0;
-
- for (List<ManifestEntry> bucketEntries : buckets.values()) {
- // Apply bucket processor (e.g., filtering)
- List<ManifestEntry> processed =
bucketProcessor.apply(bucketEntries);
+ AtomicLong currentRowCount = new AtomicLong(0);
+ for (List<ManifestEntry> entries : buckets.values()) {
+ boolean noOverlapping = noOverlapping(entries);
+ if (wholeBucketFilterEnabled()) {
+ entries =
+ noOverlapping
+ ? filterWholeBucketPerFile(entries)
+ : filterWholeBucketAllFiles(entries);
+ }
- if (limitEnabled) {
- // Apply limit pushdown if enabled
- if (accumulatedRowCount >= limit) {
- // Already reached limit, stop processing remaining buckets
+ if (limitPushdownEnabled() && noOverlapping) {
+ if (currentRowCount.get() >= limit) {
break;
}
-
- long remainingLimit = limit - accumulatedRowCount;
- List<ManifestEntry> processedBucket =
- applyLimitPushdownForBucket(processed, remainingLimit);
- if (processedBucket == null) {
- // Can't safely push down limit for this bucket, include
all processed entries
- result.addAll(processed);
- } else {
- result.addAll(processedBucket);
- for (ManifestEntry entry : processedBucket) {
- long fileRowCount = entry.file().rowCount();
- accumulatedRowCount += fileRowCount;
- }
- }
- } else {
- // No limit pushdown, just add processed entries
- result.addAll(processed);
+ entries = applyLimitWhenNoOverlapping(entries,
currentRowCount);
}
+ result.addAll(entries);
}
-
return result;
}
- private List<ManifestEntry> doFilterWholeBucketByStats(List<ManifestEntry>
entries) {
- return noOverlapping(entries)
- ? filterWholeBucketPerFile(entries)
- : filterWholeBucketAllFiles(entries);
+ private List<ManifestEntry> applyLimitWhenNoOverlapping(
+ List<ManifestEntry> entries, AtomicLong currentRowCount) {
+ List<ManifestEntry> result = new ArrayList<>();
+ for (ManifestEntry entry : entries) {
+ boolean hasDeleteRows =
+ entry.file().deleteRowCount().map(count -> count >
0L).orElse(false);
+ if (hasDeleteRows) {
+ return entries;
+ }
+ result.add(entry);
+ long fileRowCount = entry.file().rowCount();
+ currentRowCount.addAndGet(fileRowCount);
+ if (currentRowCount.get() >= limit) {
+ break;
+ }
+ }
+ return result;
}
private List<ManifestEntry> filterWholeBucketPerFile(List<ManifestEntry>
entries) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
index f975cf570d..9a9c290304 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
@@ -446,7 +446,7 @@ public class KeyValueFileStoreScanTest {
KeyValueFileStoreScan scan = storePartialUpdate.newScan();
scan.withSnapshot(snapshot.id()).withLimit(10);
// supportsLimitPushManifestEntries should return false for
PARTIAL_UPDATE
- assertThat(scan.supportsLimitPushManifestEntries()).isFalse();
+ assertThat(scan.limitPushdownEnabled()).isFalse();
// Should read all files since limit pushdown is disabled
KeyValueFileStoreScan scanWithoutLimit = storePartialUpdate.newScan();
@@ -494,7 +494,7 @@ public class KeyValueFileStoreScanTest {
KeyValueFileStoreScan scan = storeAggregate.newScan();
scan.withSnapshot(snapshot.id()).withLimit(10);
// supportsLimitPushManifestEntries should return false for AGGREGATE
- assertThat(scan.supportsLimitPushManifestEntries()).isFalse();
+ assertThat(scan.limitPushdownEnabled()).isFalse();
// Should read all files since limit pushdown is disabled
KeyValueFileStoreScan scanWithoutLimit = storeAggregate.newScan();
@@ -538,7 +538,7 @@ public class KeyValueFileStoreScanTest {
KeyValueFileStoreScan scan = storeWithDV.newScan();
scan.withLimit(10);
// supportsLimitPushManifestEntries should return false when deletion
vectors are enabled
- assertThat(scan.supportsLimitPushManifestEntries()).isFalse();
+ assertThat(scan.limitPushdownEnabled()).isFalse();
}
private void runTestExactMatch(