This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fc86abec18 [core] Refactor row id pushdown to
DataEvolutionFileStoreScan
fc86abec18 is described below
commit fc86abec181a036c2a867daf6306adffb5d67808
Author: JingsongLi <[email protected]>
AuthorDate: Sat Dec 27 20:34:54 2025 +0800
[core] Refactor row id pushdown to DataEvolutionFileStoreScan
---
.../shortcodes/generated/core_configuration.html | 6 ---
.../main/java/org/apache/paimon/CoreOptions.java | 12 -----
.../paimon/predicate/RowIdPredicateVisitor.java | 47 +++++++++--------
.../main/java/org/apache/paimon/utils/Range.java | 40 +++++++--------
.../apache/paimon/utils/RoaringNavigableMap64.java | 33 +-----------
.../paimon/globalindex/DataEvolutionBatchScan.java | 42 ++++++++++++++-
.../paimon/operation/AbstractFileStoreScan.java | 6 +--
.../operation/DataEvolutionFileStoreScan.java | 31 ------------
.../apache/paimon/operation/ManifestsReader.java | 50 ++++++++++++++----
.../org/apache/paimon/schema/SchemaValidation.java | 6 ---
.../paimon/table/source/ReadBuilderImpl.java | 54 +-------------------
.../paimon/spark/PaimonBaseScanBuilder.scala | 15 +++---
.../paimon/spark/PaimonBaseScanBuilder.scala | 14 ++---
.../paimon/spark/sql/RowIdPushDownTestBase.scala | 59 +++++++++-------------
14 files changed, 162 insertions(+), 253 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 36950aa916..fc7a0c3839 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1037,12 +1037,6 @@ This config option does not affect the default
filesystem metastore.</td>
<td>String</td>
<td>Time field for record level expire. It supports the following
types: `timestamps in seconds with INT`,`timestamps in seconds with BIGINT`,
`timestamps in milliseconds with BIGINT` or `timestamp`.</td>
</tr>
- <tr>
- <td><h5>row-id-push-down.enabled</h5></td>
- <td style="word-wrap: break-word;">false</td>
- <td>Boolean</td>
- <td>Whether to enable row id push down for scan. Currently, only
the data evolution table supports row id push down.</td>
- </tr>
<tr>
<td><h5>row-tracking.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 324608988d..20aa5182d6 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2146,14 +2146,6 @@ public class CoreOptions implements Serializable {
.withDescription(
"Whether to try upgrading the data files after
overwriting a primary key table.");
- public static final ConfigOption<Boolean> ROW_ID_PUSH_DOWN_ENABLED =
- key("row-id-push-down.enabled")
- .booleanType()
- .defaultValue(false)
- .withDescription(
- "Whether to enable row id push down for scan."
- + " Currently, only the data evolution
table supports row id push down.");
-
private final Options options;
public CoreOptions(Map<String, String> options) {
@@ -3338,10 +3330,6 @@ public class CoreOptions implements Serializable {
return options.get(OVERWRITE_UPGRADE);
}
- public boolean rowIdPushDownEnabled() {
- return options.get(ROW_ID_PUSH_DOWN_ENABLED);
- }
-
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/RowIdPredicateVisitor.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/RowIdPredicateVisitor.java
index d8ae6f6a6d..953737dd9f 100644
---
a/paimon-common/src/main/java/org/apache/paimon/predicate/RowIdPredicateVisitor.java
+++
b/paimon-common/src/main/java/org/apache/paimon/predicate/RowIdPredicateVisitor.java
@@ -22,6 +22,7 @@ import org.apache.paimon.utils.Range;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import static org.apache.paimon.table.SpecialFields.ROW_ID;
@@ -40,10 +41,10 @@ import static org.apache.paimon.table.SpecialFields.ROW_ID;
* AND _ROW_ID IN (1, 2)}).
* </ul>
*/
-public class RowIdPredicateVisitor implements PredicateVisitor<List<Range>> {
+public class RowIdPredicateVisitor implements
PredicateVisitor<Optional<List<Range>>> {
@Override
- public List<Range> visit(LeafPredicate predicate) {
+ public Optional<List<Range>> visit(LeafPredicate predicate) {
if (ROW_ID.name().equals(predicate.fieldName())) {
LeafFunction function = predicate.function();
if (function instanceof Equal || function instanceof In) {
@@ -53,57 +54,55 @@ public class RowIdPredicateVisitor implements
PredicateVisitor<List<Range>> {
}
// The list output by getRangesFromList is already sorted,
// and has no overlap
- return Range.getRangesFromList(rowIds);
+ return Optional.of(Range.toRanges(rowIds));
}
}
- return null;
+ return Optional.empty();
}
@Override
- public List<Range> visit(CompoundPredicate predicate) {
+ public Optional<List<Range>> visit(CompoundPredicate predicate) {
CompoundPredicate.Function function = predicate.function();
- List<Range> rowIds = null;
+ Optional<List<Range>> rowIds = Optional.empty();
// `And` means we should get the intersection of all children.
if (function instanceof And) {
for (Predicate child : predicate.children()) {
- List<Range> childList = child.visit(this);
- if (childList == null) {
+ Optional<List<Range>> childList = child.visit(this);
+ if (!childList.isPresent()) {
continue;
}
- if (rowIds == null) {
- rowIds = childList;
- } else {
- rowIds = Range.and(rowIds, childList);
- }
+ rowIds =
+ rowIds.map(ranges -> Optional.of(Range.and(ranges,
childList.get())))
+ .orElse(childList);
// shortcut for intersection
- if (rowIds.isEmpty()) {
+ if (rowIds.get().isEmpty()) {
return rowIds;
}
}
} else if (function instanceof Or) {
// `Or` means we should get the union of all children
- rowIds = new ArrayList<>();
+ rowIds = Optional.of(new ArrayList<>());
for (Predicate child : predicate.children()) {
- List<Range> childList = child.visit(this);
- if (childList == null) {
- return null;
+ Optional<List<Range>> childList = child.visit(this);
+ if (!childList.isPresent()) {
+ return Optional.empty();
}
- rowIds.addAll(childList);
- rowIds = Range.sortAndMergeOverlap(rowIds, true);
+ rowIds.get().addAll(childList.get());
+ rowIds = Optional.of(Range.sortAndMergeOverlap(rowIds.get(),
true));
}
} else {
- // unexpected function type, just return null
- return null;
+ // unexpected function type, just return empty
+ return Optional.empty();
}
return rowIds;
}
@Override
- public List<Range> visit(TransformPredicate predicate) {
+ public Optional<List<Range>> visit(TransformPredicate predicate) {
// do not support transform predicate now.
- return null;
+ return Optional.empty();
}
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
b/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
index 7d5afd025f..1cd80bc56b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
@@ -22,9 +22,9 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.Iterator;
import java.util.List;
import java.util.Objects;
-import java.util.stream.Collectors;
/** Range represents from (inclusive) and to (inclusive). */
public class Range implements Serializable {
@@ -178,31 +178,29 @@ public class Range implements Serializable {
return result;
}
- public static List<Range> getRangesFromList(List<Long> origLongs) {
- if (origLongs == null || origLongs.isEmpty()) {
- return Collections.emptyList();
+ public static List<Range> toRanges(Iterable<Long> ids) {
+ List<Range> ranges = new ArrayList<>();
+ Iterator<Long> iterator = ids.iterator();
+
+ if (!iterator.hasNext()) {
+ return ranges;
}
- List<Long> longs =
origLongs.stream().distinct().sorted().collect(Collectors.toList());
-
- ArrayList<Range> ranges = new ArrayList<>();
- Long rangeStart = null;
- Long rangeEnd = null;
- for (Long cur : longs) {
- if (rangeStart == null) {
- rangeStart = cur;
- rangeEnd = cur;
- } else if (rangeEnd == cur - 1) {
- rangeEnd = cur;
- } else {
+ long rangeStart = iterator.next();
+ long rangeEnd = rangeStart;
+
+ while (iterator.hasNext()) {
+ long current = iterator.next();
+ if (current != rangeEnd + 1) {
+ // Save the current range and start a new one
ranges.add(new Range(rangeStart, rangeEnd));
- rangeStart = cur;
- rangeEnd = cur;
+ rangeStart = current;
}
+ rangeEnd = current;
}
- if (rangeStart != null) {
- ranges.add(new Range(rangeStart, rangeEnd));
- }
+ // Add the last range
+ ranges.add(new Range(rangeStart, rangeEnd));
+
return ranges;
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringNavigableMap64.java
b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringNavigableMap64.java
index f07b7c6afa..bec44f3fb0 100644
---
a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringNavigableMap64.java
+++
b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringNavigableMap64.java
@@ -18,8 +18,6 @@
package org.apache.paimon.utils;
-import org.apache.paimon.annotation.VisibleForTesting;
-
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import java.io.ByteArrayInputStream;
@@ -27,7 +25,6 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
@@ -135,42 +132,16 @@ public class RoaringNavigableMap64 implements
Iterable<Long> {
}
}
- @VisibleForTesting
/**
* Converts this bitmap to a list of contiguous ranges.
*
* <p>This is useful for interoperability with APIs that expect
List<Range>.
*/
public List<Range> toRangeList() {
- List<Range> ranges = new ArrayList<>();
- Iterator<Long> iterator = roaring64NavigableMap.iterator();
-
- if (!iterator.hasNext()) {
- return ranges;
- }
-
- long rangeStart = iterator.next();
- long rangeEnd = rangeStart;
-
- while (iterator.hasNext()) {
- long current = iterator.next();
- if (current == rangeEnd + 1) {
- // Extend the current range
- rangeEnd = current;
- } else {
- // Save the current range and start a new one
- ranges.add(new Range(rangeStart, rangeEnd));
- rangeStart = current;
- rangeEnd = current;
- }
- }
- // Add the last range
- ranges.add(new Range(rangeStart, rangeEnd));
-
- return ranges;
+ // TODO Optimize this to avoid iterator all ids
+ return Range.toRanges(roaring64NavigableMap::iterator);
}
- @VisibleForTesting
public static RoaringNavigableMap64 bitmapOf(long... dat) {
RoaringNavigableMap64 roaringBitmap64 = new RoaringNavigableMap64();
for (long ele : dat) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
index 6d036d6cb0..586fd61b00 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
@@ -24,7 +24,10 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.CompoundPredicate;
+import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.RowIdPredicateVisitor;
import org.apache.paimon.predicate.TopN;
import org.apache.paimon.predicate.VectorSearch;
import org.apache.paimon.table.FileStoreTable;
@@ -47,6 +50,7 @@ import java.util.Objects;
import java.util.Optional;
import static
org.apache.paimon.globalindex.GlobalIndexScanBuilder.parallelScan;
+import static org.apache.paimon.table.SpecialFields.ROW_ID;
/** Scan for data evolution table. */
public class DataEvolutionBatchScan implements DataTableScan {
@@ -71,11 +75,43 @@ public class DataEvolutionBatchScan implements
DataTableScan {
@Override
public InnerTableScan withFilter(Predicate predicate) {
+ if (predicate == null) {
+ return this;
+ }
+
+ predicate.visit(new
RowIdPredicateVisitor()).ifPresent(this::withRowRanges);
+ predicate = removeRowIdFilter(predicate);
this.filter = predicate;
batchScan.withFilter(predicate);
return this;
}
+ private Predicate removeRowIdFilter(Predicate filter) {
+ if (filter instanceof LeafPredicate
+ && ROW_ID.name().equals(((LeafPredicate) filter).fieldName()))
{
+ return null;
+ } else if (filter instanceof CompoundPredicate) {
+ CompoundPredicate compoundPredicate = (CompoundPredicate) filter;
+
+ List<Predicate> newChildren = new ArrayList<>();
+ for (Predicate child : compoundPredicate.children()) {
+ Predicate newChild = removeRowIdFilter(child);
+ if (newChild != null) {
+ newChildren.add(newChild);
+ }
+ }
+
+ if (newChildren.isEmpty()) {
+ return null;
+ } else if (newChildren.size() == 1) {
+ return newChildren.get(0);
+ } else {
+ return new CompoundPredicate(compoundPredicate.function(),
newChildren);
+ }
+ }
+ return filter;
+ }
+
@Override
public InnerTableScan withVectorSearch(VectorSearch vectorSearch) {
this.vectorSearch = vectorSearch;
@@ -157,9 +193,13 @@ public class DataEvolutionBatchScan implements
DataTableScan {
@Override
public InnerTableScan withRowRanges(List<Range> rowRanges) {
+ if (rowRanges == null) {
+ return this;
+ }
+
this.pushedRowRanges = rowRanges;
if (globalIndexResult != null) {
- throw new IllegalStateException("");
+ throw new IllegalStateException("Cannot push row ranges after
global index eval.");
}
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index d261b050b6..284a2ef195 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -243,6 +243,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
@Override
public FileStoreScan withRowRanges(List<Range> rowRanges) {
this.rowRanges = rowRanges;
+ manifestsReader.withRowRanges(rowRanges);
return this;
}
@@ -274,7 +275,6 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
ManifestsReader.Result manifestsResult = readManifests();
Snapshot snapshot = manifestsResult.snapshot;
List<ManifestFileMeta> manifests = manifestsResult.filteredManifests;
- manifests = postFilterManifests(manifests);
Iterator<ManifestEntry> iterator = readManifestEntries(manifests,
false);
if (supportsLimitPushManifestEntries()) {
@@ -434,10 +434,6 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
/** Note: Keep this thread-safe. */
protected abstract boolean filterByStats(ManifestEntry entry);
- protected List<ManifestFileMeta>
postFilterManifests(List<ManifestFileMeta> manifests) {
- return manifests;
- }
-
protected boolean postFilterManifestEntriesEnabled() {
return false;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
index 8f31cb7774..c2695ce682 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
@@ -25,7 +25,6 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
-import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.DataEvolutionArray;
import org.apache.paimon.reader.DataEvolutionRow;
@@ -103,36 +102,6 @@ public class DataEvolutionFileStoreScan extends
AppendOnlyFileStoreScan {
return this;
}
- @Override
- protected List<ManifestFileMeta>
postFilterManifests(List<ManifestFileMeta> manifests) {
- if (rowRanges == null || rowRanges.isEmpty()) {
- return manifests;
- }
- return
manifests.stream().filter(this::filterManifestByRowIds).collect(Collectors.toList());
- }
-
- private boolean filterManifestByRowIds(ManifestFileMeta manifest) {
- if (rowRanges == null || rowRanges.isEmpty()) {
- return true;
- }
-
- Long min = manifest.minRowId();
- Long max = manifest.maxRowId();
- if (min == null || max == null) {
- return true;
- }
-
- Range manifestRowRange = new Range(min, max);
-
- for (Range expected : rowRanges) {
- if (Range.intersection(manifestRowRange, expected) != null) {
- return true;
- }
- }
-
- return false;
- }
-
@Override
public FileStoreScan withReadType(RowType readType) {
if (readType != null) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
index b3b89e72aa..fed68d6997 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
@@ -28,6 +28,7 @@ import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BiFilter;
+import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.SnapshotManager;
import javax.annotation.Nullable;
@@ -54,6 +55,7 @@ public class ManifestsReader {
@Nullable private Integer specifiedLevel = null;
@Nullable private PartitionPredicate partitionFilter = null;
@Nullable private BiFilter<Integer, Integer> levelMinMaxFilter = null;
+ @Nullable protected List<Range> rowRanges;
public ManifestsReader(
RowType partitionType,
@@ -106,6 +108,11 @@ public class ManifestsReader {
return this;
}
+ public ManifestsReader withRowRanges(List<Range> rowRanges) {
+ this.rowRanges = rowRanges;
+ return this;
+ }
+
@Nullable
public PartitionPredicate partitionFilter() {
return partitionFilter;
@@ -146,6 +153,26 @@ public class ManifestsReader {
}
}
+ private boolean filterManifestByRowRanges(ManifestFileMeta manifest) {
+ if (rowRanges == null) {
+ return true;
+ }
+ Long min = manifest.minRowId();
+ Long max = manifest.maxRowId();
+ if (min == null || max == null) {
+ return true;
+ }
+
+ Range manifestRowRange = new Range(min, max);
+
+ for (Range expected : rowRanges) {
+ if (Range.intersection(manifestRowRange, expected) != null) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/** Note: Keep this thread-safe. */
private boolean filterManifestFileMeta(ManifestFileMeta manifest) {
Integer minBucket = manifest.minBucket();
@@ -172,17 +199,22 @@ public class ManifestsReader {
}
}
- if (partitionFilter == null) {
- return true;
+ if (partitionFilter != null) {
+ SimpleStats stats = manifest.partitionStats();
+ if (!partitionFilter.test(
+ manifest.numAddedFiles() + manifest.numDeletedFiles(),
+ stats.minValues(),
+ stats.maxValues(),
+ stats.nullCounts())) {
+ return false;
+ }
+ }
+
+ if (!filterManifestByRowRanges(manifest)) {
+ return false;
}
- SimpleStats stats = manifest.partitionStats();
- return partitionFilter == null
- || partitionFilter.test(
- manifest.numAddedFiles() + manifest.numDeletedFiles(),
- stats.minValues(),
- stats.maxValues(),
- stats.nullCounts());
+ return true;
}
/** Result for reading manifest files. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index d78bba4e8f..abbc2e4912 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -653,12 +653,6 @@ public class SchemaValidation {
"Data evolution config must disabled with
deletion-vectors.enabled");
}
- if (options.rowIdPushDownEnabled()) {
- checkArgument(
- options.dataEvolutionEnabled(),
- "Row id push down config must enabled with
data-evolution.enabled");
- }
-
List<String> blobNames =
BlobType.splitBlob(schema.logicalRowType()).getRight().getFieldNames();
if (!blobNames.isEmpty()) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index 48765b7c50..c81dfd8e01 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -19,15 +19,11 @@
package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.data.variant.VariantAccessInfoUtils;
import org.apache.paimon.partition.PartitionPredicate;
-import org.apache.paimon.predicate.CompoundPredicate;
-import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.predicate.RowIdPredicateVisitor;
import org.apache.paimon.predicate.TopN;
import org.apache.paimon.predicate.VectorSearch;
import org.apache.paimon.table.InnerTable;
@@ -37,14 +33,12 @@ import org.apache.paimon.utils.Range;
import javax.annotation.Nullable;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.partition.PartitionPredicate.fromPredicate;
-import static org.apache.paimon.table.SpecialFields.ROW_ID;
import static org.apache.paimon.utils.Preconditions.checkState;
/** Implementation for {@link ReadBuilder}. */
@@ -71,7 +65,7 @@ public class ReadBuilderImpl implements ReadBuilder {
private @Nullable RowType readType;
private @Nullable VariantAccessInfo[] variantAccessInfo;
- public @Nullable @VisibleForTesting List<Range> rowRanges;
+ private @Nullable List<Range> rowRanges;
private @Nullable VectorSearch vectorSearch;
private boolean dropStats = false;
@@ -105,55 +99,9 @@ public class ReadBuilderImpl implements ReadBuilder {
} else {
this.filter = PredicateBuilder.and(this.filter, filter);
}
- calculateRowRanges(this.filter);
- this.filter = removeRowIdFilter(this.filter);
return this;
}
- private void calculateRowRanges(Predicate filter) {
- if (filter == null) {
- return;
- }
-
- RowIdPredicateVisitor visitor = new RowIdPredicateVisitor();
- List<Range> ranges = filter.visit(visitor);
- // When rowRanges is not null, filter data based on rowRanges.
- // If rowRanges is empty, it means no data will be read.
- if (ranges != null) {
- withRowRanges(ranges);
- }
- }
-
- private Predicate removeRowIdFilter(Predicate filter) {
- if (filter == null) {
- return null;
- }
-
- if (filter instanceof LeafPredicate
- && ROW_ID.name().equals(((LeafPredicate) filter).fieldName()))
{
- return null;
- } else if (filter instanceof CompoundPredicate) {
- CompoundPredicate compoundPredicate = (CompoundPredicate) filter;
-
- List<Predicate> newChildren = new ArrayList<>();
- for (Predicate child : compoundPredicate.children()) {
- Predicate newChild = removeRowIdFilter(child);
- if (newChild != null) {
- newChildren.add(newChild);
- }
- }
-
- if (newChildren.isEmpty()) {
- return null;
- } else if (newChildren.size() == 1) {
- return newChildren.get(0);
- } else {
- return new CompoundPredicate(compoundPredicate.function(),
newChildren);
- }
- }
- return filter;
- }
-
@Override
public ReadBuilder withPartitionFilter(Map<String, String> partitionSpec) {
if (partitionSpec != null) {
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
index 06b826b72a..41a1d552f1 100644
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -22,15 +22,15 @@ import org.apache.paimon.CoreOptions
import org.apache.paimon.partition.PartitionPredicate
import
org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates
import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate}
-import org.apache.paimon.table.{InnerTable, Table}
-import org.apache.paimon.table.SpecialFields.ROW_ID
-import org.apache.paimon.types.{DataField, DataTypes, RowType}
+import org.apache.paimon.table.SpecialFields.rowTypeWithRowTracking
+import org.apache.paimon.table.Table
+import org.apache.paimon.types.RowType
import org.apache.spark.sql.connector.read.{SupportsPushDownFilters,
SupportsPushDownRequiredColumns}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
-import java.util.{ArrayList, List => JList}
+import java.util.{List => JList}
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -69,11 +69,8 @@ abstract class PaimonBaseScanBuilder
val postScan = mutable.ArrayBuffer.empty[Filter]
var newRowType = rowType
- if (table.isInstanceOf[InnerTable] && coreOptions.rowIdPushDownEnabled()) {
- val dataFieldsWithRowId = new ArrayList[DataField](rowType.getFields)
- dataFieldsWithRowId.add(
- new DataField(rowType.getFieldCount, ROW_ID.name(),
DataTypes.BIGINT()))
- newRowType = rowType.copy(dataFieldsWithRowId)
+ if (coreOptions.rowTrackingEnabled() &&
coreOptions.dataEvolutionEnabled()) {
+ newRowType = rowTypeWithRowTracking(newRowType);
}
val converter = new SparkFilterConverter(newRowType)
val partitionPredicateVisitor = new
PartitionPredicateVisitor(partitionKeys)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
index 4ac5dd051b..8179f504b3 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -22,15 +22,14 @@ import org.apache.paimon.CoreOptions
import org.apache.paimon.partition.PartitionPredicate
import
org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates
import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate, TopN}
-import org.apache.paimon.table.{InnerTable, Table}
-import org.apache.paimon.table.SpecialFields.ROW_ID
-import org.apache.paimon.types.{DataField, DataTypes, RowType}
+import org.apache.paimon.table.{SpecialFields, Table}
+import org.apache.paimon.types.RowType
import org.apache.spark.sql.connector.expressions.filter.{Predicate =>
SparkPredicate}
import org.apache.spark.sql.connector.read.{SupportsPushDownLimit,
SupportsPushDownRequiredColumns, SupportsPushDownV2Filters}
import org.apache.spark.sql.types.StructType
-import java.util.{ArrayList, List => JList}
+import java.util.{List => JList}
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -67,11 +66,8 @@ abstract class PaimonBaseScanBuilder
val postScan = mutable.ArrayBuffer.empty[SparkPredicate]
var newRowType = rowType
- if (table.isInstanceOf[InnerTable] && coreOptions.rowIdPushDownEnabled()) {
- val dataFieldsWithRowId = new ArrayList[DataField](rowType.getFields)
- dataFieldsWithRowId.add(
- new DataField(rowType.getFieldCount, ROW_ID.name(),
DataTypes.BIGINT()))
- newRowType = rowType.copy(dataFieldsWithRowId)
+ if (coreOptions.rowTrackingEnabled() &&
coreOptions.dataEvolutionEnabled()) {
+ newRowType = SpecialFields.rowTypeWithRowTracking(newRowType);
}
val converter = SparkV2FilterConverter(newRowType)
val partitionPredicateVisitor = new
PartitionPredicateVisitor(partitionKeys)
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala
index 1e9f2d11b5..bb1d3e8717 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala
@@ -32,61 +32,48 @@ class RowIdPushDownTestBase extends PaimonSparkTestBase {
withTable("t") {
sql("CREATE TABLE t (a INT, b INT, c STRING) TBLPROPERTIES " +
"('row-tracking.enabled'='true', 'data-evolution.enabled'='true',
'row-id-push-down.enabled'='true')")
- sql("INSERT INTO t VALUES (1, 1, '1'), (2, 2, '2'), (3, 3, '3'), (4, 4,
'4')")
+
+ // first manifest
+ sql("INSERT INTO t VALUES (0, 0, '0'), (1, 1, '1'), (2, 2, '2'), (3, 3,
'3')")
+
+ // second manifest
+ sql("INSERT INTO t VALUES (4, 4, '4'), (5, 5, '5')")
+
+ // delete second manifest
+ // after push down, should never read it
+ val table = loadTable("t")
+ val manifests =
+
table.store().manifestListFactory().create().readAllManifests(table.latestSnapshot().get)
+ val secondManifest = manifests.asScala.find(m => m.minRowId() == 4L).get
+
table.store().manifestFileFactory().create().delete(secondManifest.fileName())
// 1.LeafPredicate
- assertResult(Seq(new Range(0L, 0L)))(
- getPaimonScan("SELECT * FROM t WHERE _ROW_ID = 0").readBuilder
- .asInstanceOf[ReadBuilderImpl]
- .rowRanges
- .asScala)
checkAnswer(
sql("SELECT * FROM t WHERE _ROW_ID = 0"),
- Seq(Row(1, 1, "1"))
+ Seq(Row(0, 0, "0"))
)
- assertResult(Seq(new Range(0L, 1L), new Range(3L, 3L)))(
- getPaimonScan("SELECT * FROM t WHERE _ROW_ID IN (0, 1, 3)").readBuilder
- .asInstanceOf[ReadBuilderImpl]
- .rowRanges
- .asScala)
checkAnswer(
sql("SELECT * FROM t WHERE _ROW_ID IN (0, 1, 3)"),
- Seq(Row(1, 1, "1"), Row(2, 2, "2"), Row(4, 4, "4"))
+ Seq(Row(0, 0, "0"), Row(1, 1, "1"), Row(3, 3, "3"))
)
- assertResult(Seq(new Range(4L, 5L)))(
- getPaimonScan("SELECT * FROM t WHERE _ROW_ID IN (4, 5)").readBuilder
- .asInstanceOf[ReadBuilderImpl]
- .rowRanges
- .asScala)
checkAnswer(
- sql("SELECT * FROM t WHERE _ROW_ID IN (4, 5)"),
+ sql("SELECT * FROM t WHERE _ROW_ID IN (1, 6)"),
+ Seq(Row(1, 1, "1"))
+ )
+ checkAnswer(
+ sql("SELECT * FROM t WHERE _ROW_ID IN (6, 7)"),
Seq()
)
// 2.CompoundPredicate
- assertResult(Seq(new Range(0, 0)))(
- getPaimonScan("SELECT * FROM t WHERE _ROW_ID = 0 AND _ROW_ID IN (0,
1)").readBuilder
- .asInstanceOf[ReadBuilderImpl]
- .rowRanges
- .asScala)
checkAnswer(
sql("SELECT * FROM t WHERE _ROW_ID = 0 AND _ROW_ID IN (0, 1)"),
- Seq(Row(1, 1, "1"))
+ Seq(Row(0, 0, "0"))
)
- assertResult(Seq(new Range(0, 2)))(
- getPaimonScan("SELECT * FROM t WHERE _ROW_ID = 0 OR _ROW_ID IN (1,
2)").readBuilder
- .asInstanceOf[ReadBuilderImpl]
- .rowRanges
- .asScala)
checkAnswer(
sql("SELECT * FROM t WHERE _ROW_ID = 0 OR _ROW_ID IN (1, 2)"),
- Seq(Row(1, 1, "1"), Row(2, 2, "2"), Row(3, 3, "3"))
+ Seq(Row(0, 0, "0"), Row(1, 1, "1"), Row(2, 2, "2"))
)
- assertResult(Seq())(
- getPaimonScan("SELECT * FROM t WHERE _ROW_ID = 0 AND _ROW_ID IN (1,
2)").readBuilder
- .asInstanceOf[ReadBuilderImpl]
- .rowRanges
- .asScala)
checkAnswer(
sql("SELECT * FROM t WHERE _ROW_ID = 0 AND _ROW_ID IN (1, 2)"),
Seq()