This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b6d2302df3 [spark] paimon-spark supports row id push down (#6697)
b6d2302df3 is described below
commit b6d2302df303af6eca1e0e333bfb48ab08ee4e0c
Author: Xiao Zhu <[email protected]>
AuthorDate: Sat Dec 27 22:39:21 2025 +0800
[spark] paimon-spark supports row id push down (#6697)
---
.../shortcodes/generated/core_configuration.html | 6 ++
.../main/java/org/apache/paimon/CoreOptions.java | 12 +++
.../paimon/predicate/RowIdPredicateVisitor.java | 109 +++++++++++++++++++++
.../main/java/org/apache/paimon/utils/Range.java | 29 ++++++
.../org/apache/paimon/schema/SchemaValidation.java | 6 ++
.../paimon/table/source/ReadBuilderImpl.java | 54 +++++++++-
.../paimon/spark/PaimonBaseScanBuilder.scala | 18 +++-
.../paimon/spark/sql/RowIdPushDownTest.scala | 21 ++++
.../paimon/spark/sql/RowIdPushDownTest.scala | 21 ++++
.../paimon/spark/sql/RowIdPushDownTest.scala | 21 ++++
.../paimon/spark/sql/RowIdPushDownTest.scala | 21 ++++
.../paimon/spark/sql/RowIdPushDownTest.scala | 21 ++++
.../paimon/spark/PaimonBaseScanBuilder.scala | 18 +++-
.../paimon/spark/sql/RowIdPushDownTestBase.scala | 96 ++++++++++++++++++
14 files changed, 444 insertions(+), 9 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index fc7a0c3839..36950aa916 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1037,6 +1037,12 @@ This config option does not affect the default
filesystem metastore.</td>
<td>String</td>
<td>Time field for record level expire. It supports the following
types: `timestamps in seconds with INT`,`timestamps in seconds with BIGINT`,
`timestamps in milliseconds with BIGINT` or `timestamp`.</td>
</tr>
+ <tr>
+ <td><h5>row-id-push-down.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to enable row id push down for scan. Currently, only
the data evolution table supports row id push down.</td>
+ </tr>
<tr>
<td><h5>row-tracking.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 20aa5182d6..324608988d 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2146,6 +2146,14 @@ public class CoreOptions implements Serializable {
.withDescription(
"Whether to try upgrading the data files after
overwriting a primary key table.");
+ public static final ConfigOption<Boolean> ROW_ID_PUSH_DOWN_ENABLED =
+ key("row-id-push-down.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to enable row id push down for scan."
+ + " Currently, only the data evolution
table supports row id push down.");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
@@ -3330,6 +3338,10 @@ public class CoreOptions implements Serializable {
return options.get(OVERWRITE_UPGRADE);
}
+ public boolean rowIdPushDownEnabled() {
+ return options.get(ROW_ID_PUSH_DOWN_ENABLED);
+ }
+
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/RowIdPredicateVisitor.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/RowIdPredicateVisitor.java
new file mode 100644
index 0000000000..d8ae6f6a6d
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/predicate/RowIdPredicateVisitor.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.utils.Range;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.table.SpecialFields.ROW_ID;
+
+/**
+ * The {@link PredicateVisitor} to extract a list of row id ranges from
predicates. The returned row
+ * id ranges can be pushed down to manifest readers and file readers to enable
efficient random
+ * access.
+ *
+ * <p>Note that there is a significant distinction between returning {@code
null} and returning an
+ * empty list:
+ *
+ * <ul>
+ * <li>{@code null} indicates that the predicate cannot be converted into a
random-access pattern,
+ * meaning the filter is not consumable by this visitor.
+ * <li>An empty list indicates that no rows satisfy the predicate (e.g.
{@code WHERE _ROW_ID = 3
+ * AND _ROW_ID IN (1, 2)}).
+ * </ul>
+ */
+public class RowIdPredicateVisitor implements PredicateVisitor<List<Range>> {
+
+ @Override
+ public List<Range> visit(LeafPredicate predicate) {
+ if (ROW_ID.name().equals(predicate.fieldName())) {
+ LeafFunction function = predicate.function();
+ if (function instanceof Equal || function instanceof In) {
+ ArrayList<Long> rowIds = new ArrayList<>();
+ for (Object literal : predicate.literals()) {
+ rowIds.add((Long) literal);
+ }
+ // The list output by getRangesFromList is already sorted,
+ // and has no overlap
+ return Range.getRangesFromList(rowIds);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public List<Range> visit(CompoundPredicate predicate) {
+ CompoundPredicate.Function function = predicate.function();
+ List<Range> rowIds = null;
+ // `And` means we should get the intersection of all children.
+ if (function instanceof And) {
+ for (Predicate child : predicate.children()) {
+ List<Range> childList = child.visit(this);
+ if (childList == null) {
+ continue;
+ }
+
+ if (rowIds == null) {
+ rowIds = childList;
+ } else {
+ rowIds = Range.and(rowIds, childList);
+ }
+
+ // shortcut for intersection
+ if (rowIds.isEmpty()) {
+ return rowIds;
+ }
+ }
+ } else if (function instanceof Or) {
+ // `Or` means we should get the union of all children
+ rowIds = new ArrayList<>();
+ for (Predicate child : predicate.children()) {
+ List<Range> childList = child.visit(this);
+ if (childList == null) {
+ return null;
+ }
+
+ rowIds.addAll(childList);
+ rowIds = Range.sortAndMergeOverlap(rowIds, true);
+ }
+ } else {
+ // unexpected function type, just return null
+ return null;
+ }
+ return rowIds;
+ }
+
+ @Override
+ public List<Range> visit(TransformPredicate predicate) {
+ // do not support transform predicate now.
+ return null;
+ }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
b/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
index daa5620725..7d5afd025f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
+import java.util.stream.Collectors;
/** Range represents from (inclusive) and to (inclusive). */
public class Range implements Serializable {
@@ -177,6 +178,34 @@ public class Range implements Serializable {
return result;
}
+ public static List<Range> getRangesFromList(List<Long> origLongs) {
+ if (origLongs == null || origLongs.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<Long> longs =
origLongs.stream().distinct().sorted().collect(Collectors.toList());
+
+ ArrayList<Range> ranges = new ArrayList<>();
+ Long rangeStart = null;
+ Long rangeEnd = null;
+ for (Long cur : longs) {
+ if (rangeStart == null) {
+ rangeStart = cur;
+ rangeEnd = cur;
+ } else if (rangeEnd == cur - 1) {
+ rangeEnd = cur;
+ } else {
+ ranges.add(new Range(rangeStart, rangeEnd));
+ rangeStart = cur;
+ rangeEnd = cur;
+ }
+ }
+ if (rangeStart != null) {
+ ranges.add(new Range(rangeStart, rangeEnd));
+ }
+ return ranges;
+ }
+
/**
* Computes the intersection of two lists of ranges.
*
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index abbc2e4912..d78bba4e8f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -653,6 +653,12 @@ public class SchemaValidation {
"Data evolution config must disabled with
deletion-vectors.enabled");
}
+ if (options.rowIdPushDownEnabled()) {
+ checkArgument(
+ options.dataEvolutionEnabled(),
+ "Row id push down config must enabled with
data-evolution.enabled");
+ }
+
List<String> blobNames =
BlobType.splitBlob(schema.logicalRowType()).getRight().getFieldNames();
if (!blobNames.isEmpty()) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index c81dfd8e01..48765b7c50 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -19,11 +19,15 @@
package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.data.variant.VariantAccessInfoUtils;
import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.CompoundPredicate;
+import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.RowIdPredicateVisitor;
import org.apache.paimon.predicate.TopN;
import org.apache.paimon.predicate.VectorSearch;
import org.apache.paimon.table.InnerTable;
@@ -33,12 +37,14 @@ import org.apache.paimon.utils.Range;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.partition.PartitionPredicate.fromPredicate;
+import static org.apache.paimon.table.SpecialFields.ROW_ID;
import static org.apache.paimon.utils.Preconditions.checkState;
/** Implementation for {@link ReadBuilder}. */
@@ -65,7 +71,7 @@ public class ReadBuilderImpl implements ReadBuilder {
private @Nullable RowType readType;
private @Nullable VariantAccessInfo[] variantAccessInfo;
- private @Nullable List<Range> rowRanges;
+ public @Nullable @VisibleForTesting List<Range> rowRanges;
private @Nullable VectorSearch vectorSearch;
private boolean dropStats = false;
@@ -99,9 +105,55 @@ public class ReadBuilderImpl implements ReadBuilder {
} else {
this.filter = PredicateBuilder.and(this.filter, filter);
}
+ calculateRowRanges(this.filter);
+ this.filter = removeRowIdFilter(this.filter);
return this;
}
+ private void calculateRowRanges(Predicate filter) {
+ if (filter == null) {
+ return;
+ }
+
+ RowIdPredicateVisitor visitor = new RowIdPredicateVisitor();
+ List<Range> ranges = filter.visit(visitor);
+ // When rowRanges is not null, filter data based on rowRanges.
+ // If rowRanges is empty, it means no data will be read.
+ if (ranges != null) {
+ withRowRanges(ranges);
+ }
+ }
+
+ private Predicate removeRowIdFilter(Predicate filter) {
+ if (filter == null) {
+ return null;
+ }
+
+ if (filter instanceof LeafPredicate
+ && ROW_ID.name().equals(((LeafPredicate) filter).fieldName()))
{
+ return null;
+ } else if (filter instanceof CompoundPredicate) {
+ CompoundPredicate compoundPredicate = (CompoundPredicate) filter;
+
+ List<Predicate> newChildren = new ArrayList<>();
+ for (Predicate child : compoundPredicate.children()) {
+ Predicate newChild = removeRowIdFilter(child);
+ if (newChild != null) {
+ newChildren.add(newChild);
+ }
+ }
+
+ if (newChildren.isEmpty()) {
+ return null;
+ } else if (newChildren.size() == 1) {
+ return newChildren.get(0);
+ } else {
+ return new CompoundPredicate(compoundPredicate.function(),
newChildren);
+ }
+ }
+ return filter;
+ }
+
@Override
public ReadBuilder withPartitionFilter(Map<String, String> partitionSpec) {
if (partitionSpec != null) {
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
index 806ce90e28..06b826b72a 100644
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -18,17 +18,19 @@
package org.apache.paimon.spark
+import org.apache.paimon.CoreOptions
import org.apache.paimon.partition.PartitionPredicate
import
org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates
import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate}
-import org.apache.paimon.table.Table
-import org.apache.paimon.types.RowType
+import org.apache.paimon.table.{InnerTable, Table}
+import org.apache.paimon.table.SpecialFields.ROW_ID
+import org.apache.paimon.types.{DataField, DataTypes, RowType}
import org.apache.spark.sql.connector.read.{SupportsPushDownFilters,
SupportsPushDownRequiredColumns}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
-import java.util.{List => JList}
+import java.util.{ArrayList, List => JList}
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -41,6 +43,7 @@ abstract class PaimonBaseScanBuilder
val table: Table
val partitionKeys: JList[String] = table.partitionKeys()
val rowType: RowType = table.rowType()
+ val coreOptions: CoreOptions = CoreOptions.fromMap(table.options())
private var pushedSparkFilters = Array.empty[Filter]
protected var hasPostScanPredicates = false
@@ -65,7 +68,14 @@ abstract class PaimonBaseScanBuilder
val pushableDataFilters = mutable.ArrayBuffer.empty[Predicate]
val postScan = mutable.ArrayBuffer.empty[Filter]
- val converter = new SparkFilterConverter(rowType)
+ var newRowType = rowType
+ if (table.isInstanceOf[InnerTable] && coreOptions.rowIdPushDownEnabled()) {
+ val dataFieldsWithRowId = new ArrayList[DataField](rowType.getFields)
+ dataFieldsWithRowId.add(
+ new DataField(rowType.getFieldCount, ROW_ID.name(),
DataTypes.BIGINT()))
+ newRowType = rowType.copy(dataFieldsWithRowId)
+ }
+ val converter = new SparkFilterConverter(newRowType)
val partitionPredicateVisitor = new
PartitionPredicateVisitor(partitionKeys)
filters.foreach {
filter =>
diff --git
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
new file mode 100644
index 0000000000..da4c9b854d
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class RowIdPushDownTest extends RowIdPushDownTestBase {}
diff --git
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
new file mode 100644
index 0000000000..da4c9b854d
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class RowIdPushDownTest extends RowIdPushDownTestBase {}
diff --git
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
new file mode 100644
index 0000000000..da4c9b854d
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class RowIdPushDownTest extends RowIdPushDownTestBase {}
diff --git
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
new file mode 100644
index 0000000000..da4c9b854d
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class RowIdPushDownTest extends RowIdPushDownTestBase {}
diff --git
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
new file mode 100644
index 0000000000..da4c9b854d
--- /dev/null
+++
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class RowIdPushDownTest extends RowIdPushDownTestBase {}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
index 71ab544b71..4ac5dd051b 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -18,17 +18,19 @@
package org.apache.paimon.spark
+import org.apache.paimon.CoreOptions
import org.apache.paimon.partition.PartitionPredicate
import
org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates
import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate, TopN}
-import org.apache.paimon.table.Table
-import org.apache.paimon.types.RowType
+import org.apache.paimon.table.{InnerTable, Table}
+import org.apache.paimon.table.SpecialFields.ROW_ID
+import org.apache.paimon.types.{DataField, DataTypes, RowType}
import org.apache.spark.sql.connector.expressions.filter.{Predicate =>
SparkPredicate}
import org.apache.spark.sql.connector.read.{SupportsPushDownLimit,
SupportsPushDownRequiredColumns, SupportsPushDownV2Filters}
import org.apache.spark.sql.types.StructType
-import java.util.{List => JList}
+import java.util.{ArrayList, List => JList}
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -42,6 +44,7 @@ abstract class PaimonBaseScanBuilder
val table: Table
val partitionKeys: JList[String] = table.partitionKeys()
val rowType: RowType = table.rowType()
+ val coreOptions: CoreOptions = CoreOptions.fromMap(table.options())
private var pushedSparkPredicates = Array.empty[SparkPredicate]
protected var hasPostScanPredicates = false
@@ -63,7 +66,14 @@ abstract class PaimonBaseScanBuilder
val pushableDataFilters = mutable.ArrayBuffer.empty[Predicate]
val postScan = mutable.ArrayBuffer.empty[SparkPredicate]
- val converter = SparkV2FilterConverter(rowType)
+ var newRowType = rowType
+ if (table.isInstanceOf[InnerTable] && coreOptions.rowIdPushDownEnabled()) {
+ val dataFieldsWithRowId = new ArrayList[DataField](rowType.getFields)
+ dataFieldsWithRowId.add(
+ new DataField(rowType.getFieldCount, ROW_ID.name(),
DataTypes.BIGINT()))
+ newRowType = rowType.copy(dataFieldsWithRowId)
+ }
+ val converter = SparkV2FilterConverter(newRowType)
val partitionPredicateVisitor = new
PartitionPredicateVisitor(partitionKeys)
predicates.foreach {
predicate =>
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala
new file mode 100644
index 0000000000..1e9f2d11b5
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.table.source.ReadBuilderImpl
+import org.apache.paimon.utils.Range
+
+import org.apache.spark.sql.Row
+
+import scala.collection.JavaConverters._
+
+class RowIdPushDownTestBase extends PaimonSparkTestBase {
+
+ test("test paimon-spark row id push down") {
+ withTable("t") {
+ sql("CREATE TABLE t (a INT, b INT, c STRING) TBLPROPERTIES " +
+ "('row-tracking.enabled'='true', 'data-evolution.enabled'='true',
'row-id-push-down.enabled'='true')")
+ sql("INSERT INTO t VALUES (1, 1, '1'), (2, 2, '2'), (3, 3, '3'), (4, 4,
'4')")
+
+ // 1.LeafPredicate
+ assertResult(Seq(new Range(0L, 0L)))(
+ getPaimonScan("SELECT * FROM t WHERE _ROW_ID = 0").readBuilder
+ .asInstanceOf[ReadBuilderImpl]
+ .rowRanges
+ .asScala)
+ checkAnswer(
+ sql("SELECT * FROM t WHERE _ROW_ID = 0"),
+ Seq(Row(1, 1, "1"))
+ )
+ assertResult(Seq(new Range(0L, 1L), new Range(3L, 3L)))(
+ getPaimonScan("SELECT * FROM t WHERE _ROW_ID IN (0, 1, 3)").readBuilder
+ .asInstanceOf[ReadBuilderImpl]
+ .rowRanges
+ .asScala)
+ checkAnswer(
+ sql("SELECT * FROM t WHERE _ROW_ID IN (0, 1, 3)"),
+ Seq(Row(1, 1, "1"), Row(2, 2, "2"), Row(4, 4, "4"))
+ )
+ assertResult(Seq(new Range(4L, 5L)))(
+ getPaimonScan("SELECT * FROM t WHERE _ROW_ID IN (4, 5)").readBuilder
+ .asInstanceOf[ReadBuilderImpl]
+ .rowRanges
+ .asScala)
+ checkAnswer(
+ sql("SELECT * FROM t WHERE _ROW_ID IN (4, 5)"),
+ Seq()
+ )
+
+ // 2.CompoundPredicate
+ assertResult(Seq(new Range(0, 0)))(
+ getPaimonScan("SELECT * FROM t WHERE _ROW_ID = 0 AND _ROW_ID IN (0,
1)").readBuilder
+ .asInstanceOf[ReadBuilderImpl]
+ .rowRanges
+ .asScala)
+ checkAnswer(
+ sql("SELECT * FROM t WHERE _ROW_ID = 0 AND _ROW_ID IN (0, 1)"),
+ Seq(Row(1, 1, "1"))
+ )
+ assertResult(Seq(new Range(0, 2)))(
+ getPaimonScan("SELECT * FROM t WHERE _ROW_ID = 0 OR _ROW_ID IN (1,
2)").readBuilder
+ .asInstanceOf[ReadBuilderImpl]
+ .rowRanges
+ .asScala)
+ checkAnswer(
+ sql("SELECT * FROM t WHERE _ROW_ID = 0 OR _ROW_ID IN (1, 2)"),
+ Seq(Row(1, 1, "1"), Row(2, 2, "2"), Row(3, 3, "3"))
+ )
+ assertResult(Seq())(
+ getPaimonScan("SELECT * FROM t WHERE _ROW_ID = 0 AND _ROW_ID IN (1,
2)").readBuilder
+ .asInstanceOf[ReadBuilderImpl]
+ .rowRanges
+ .asScala)
+ checkAnswer(
+ sql("SELECT * FROM t WHERE _ROW_ID = 0 AND _ROW_ID IN (1, 2)"),
+ Seq()
+ )
+ }
+ }
+}