This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b6d2302df3 [spark] paimon-spark supports row id push down (#6697)
b6d2302df3 is described below

commit b6d2302df303af6eca1e0e333bfb48ab08ee4e0c
Author: Xiao Zhu <[email protected]>
AuthorDate: Sat Dec 27 22:39:21 2025 +0800

    [spark] paimon-spark supports row id push down (#6697)
---
 .../shortcodes/generated/core_configuration.html   |   6 ++
 .../main/java/org/apache/paimon/CoreOptions.java   |  12 +++
 .../paimon/predicate/RowIdPredicateVisitor.java    | 109 +++++++++++++++++++++
 .../main/java/org/apache/paimon/utils/Range.java   |  29 ++++++
 .../org/apache/paimon/schema/SchemaValidation.java |   6 ++
 .../paimon/table/source/ReadBuilderImpl.java       |  54 +++++++++-
 .../paimon/spark/PaimonBaseScanBuilder.scala       |  18 +++-
 .../paimon/spark/sql/RowIdPushDownTest.scala       |  21 ++++
 .../paimon/spark/sql/RowIdPushDownTest.scala       |  21 ++++
 .../paimon/spark/sql/RowIdPushDownTest.scala       |  21 ++++
 .../paimon/spark/sql/RowIdPushDownTest.scala       |  21 ++++
 .../paimon/spark/sql/RowIdPushDownTest.scala       |  21 ++++
 .../paimon/spark/PaimonBaseScanBuilder.scala       |  18 +++-
 .../paimon/spark/sql/RowIdPushDownTestBase.scala   |  96 ++++++++++++++++++
 14 files changed, 444 insertions(+), 9 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index fc7a0c3839..36950aa916 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1037,6 +1037,12 @@ This config option does not affect the default 
filesystem metastore.</td>
             <td>String</td>
             <td>Time field for record level expire. It supports the following 
types: `timestamps in seconds with INT`,`timestamps in seconds with BIGINT`, 
`timestamps in milliseconds with BIGINT` or `timestamp`.</td>
         </tr>
+        <tr>
+            <td><h5>row-id-push-down.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to enable row id push down for scan. Currently, only 
the data evolution table supports row id push down.</td>
+        </tr>
         <tr>
             <td><h5>row-tracking.enabled</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 20aa5182d6..324608988d 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2146,6 +2146,14 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "Whether to try upgrading the data files after 
overwriting a primary key table.");
 
+    public static final ConfigOption<Boolean> ROW_ID_PUSH_DOWN_ENABLED =
+            key("row-id-push-down.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to enable row id push down for scan."
+                                    + " Currently, only the data evolution 
table supports row id push down.");
+
     private final Options options;
 
     public CoreOptions(Map<String, String> options) {
@@ -3330,6 +3338,10 @@ public class CoreOptions implements Serializable {
         return options.get(OVERWRITE_UPGRADE);
     }
 
+    public boolean rowIdPushDownEnabled() {
+        return options.get(ROW_ID_PUSH_DOWN_ENABLED);
+    }
+
     /** Specifies the merge engine for table with primary key. */
     public enum MergeEngine implements DescribedEnum {
         DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/RowIdPredicateVisitor.java
 
b/paimon-common/src/main/java/org/apache/paimon/predicate/RowIdPredicateVisitor.java
new file mode 100644
index 0000000000..d8ae6f6a6d
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/RowIdPredicateVisitor.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.utils.Range;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.table.SpecialFields.ROW_ID;
+
+/**
+ * The {@link PredicateVisitor} to extract a list of row id ranges from 
predicates. The returned row
+ * id ranges can be pushed down to manifest readers and file readers to enable 
efficient random
+ * access.
+ *
+ * <p>Note that there is a significant distinction between returning {@code 
null} and returning an
+ * empty list:
+ *
+ * <ul>
+ *   <li>{@code null} indicates that the predicate cannot be converted into a 
random-access pattern,
+ *       meaning the filter is not consumable by this visitor.
+ *   <li>An empty list indicates that no rows satisfy the predicate (e.g. 
{@code WHERE _ROW_ID = 3
+ *       AND _ROW_ID IN (1, 2)}).
+ * </ul>
+ */
+public class RowIdPredicateVisitor implements PredicateVisitor<List<Range>> {
+
+    @Override
+    public List<Range> visit(LeafPredicate predicate) {
+        if (ROW_ID.name().equals(predicate.fieldName())) {
+            LeafFunction function = predicate.function();
+            if (function instanceof Equal || function instanceof In) {
+                ArrayList<Long> rowIds = new ArrayList<>();
+                for (Object literal : predicate.literals()) {
+                    rowIds.add((Long) literal);
+                }
+                // The list output by getRangesFromList is already sorted,
+                // and has no overlap
+                return Range.getRangesFromList(rowIds);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public List<Range> visit(CompoundPredicate predicate) {
+        CompoundPredicate.Function function = predicate.function();
+        List<Range> rowIds = null;
+        // `And` means we should get the intersection of all children.
+        if (function instanceof And) {
+            for (Predicate child : predicate.children()) {
+                List<Range> childList = child.visit(this);
+                if (childList == null) {
+                    continue;
+                }
+
+                if (rowIds == null) {
+                    rowIds = childList;
+                } else {
+                    rowIds = Range.and(rowIds, childList);
+                }
+
+                // shortcut for intersection
+                if (rowIds.isEmpty()) {
+                    return rowIds;
+                }
+            }
+        } else if (function instanceof Or) {
+            // `Or` means we should get the union of all children
+            rowIds = new ArrayList<>();
+            for (Predicate child : predicate.children()) {
+                List<Range> childList = child.visit(this);
+                if (childList == null) {
+                    return null;
+                }
+
+                rowIds.addAll(childList);
+                rowIds = Range.sortAndMergeOverlap(rowIds, true);
+            }
+        } else {
+            // unexpected function type, just return null
+            return null;
+        }
+        return rowIds;
+    }
+
+    @Override
+    public List<Range> visit(TransformPredicate predicate) {
+        // do not support transform predicate now.
+        return null;
+    }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Range.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
index daa5620725..7d5afd025f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 /** Range represents from (inclusive) and to (inclusive). */
 public class Range implements Serializable {
@@ -177,6 +178,34 @@ public class Range implements Serializable {
         return result;
     }
 
+    public static List<Range> getRangesFromList(List<Long> origLongs) {
+        if (origLongs == null || origLongs.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        List<Long> longs = 
origLongs.stream().distinct().sorted().collect(Collectors.toList());
+
+        ArrayList<Range> ranges = new ArrayList<>();
+        Long rangeStart = null;
+        Long rangeEnd = null;
+        for (Long cur : longs) {
+            if (rangeStart == null) {
+                rangeStart = cur;
+                rangeEnd = cur;
+            } else if (rangeEnd == cur - 1) {
+                rangeEnd = cur;
+            } else {
+                ranges.add(new Range(rangeStart, rangeEnd));
+                rangeStart = cur;
+                rangeEnd = cur;
+            }
+        }
+        if (rangeStart != null) {
+            ranges.add(new Range(rangeStart, rangeEnd));
+        }
+        return ranges;
+    }
+
     /**
      * Computes the intersection of two lists of ranges.
      *
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index abbc2e4912..d78bba4e8f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -653,6 +653,12 @@ public class SchemaValidation {
                     "Data evolution config must disabled with 
deletion-vectors.enabled");
         }
 
+        if (options.rowIdPushDownEnabled()) {
+            checkArgument(
+                    options.dataEvolutionEnabled(),
+                    "Row id push down config must enabled with 
data-evolution.enabled");
+        }
+
         List<String> blobNames =
                 
BlobType.splitBlob(schema.logicalRowType()).getRight().getFieldNames();
         if (!blobNames.isEmpty()) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index c81dfd8e01..48765b7c50 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -19,11 +19,15 @@
 package org.apache.paimon.table.source;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.data.variant.VariantAccessInfoUtils;
 import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.CompoundPredicate;
+import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.RowIdPredicateVisitor;
 import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.predicate.VectorSearch;
 import org.apache.paimon.table.InnerTable;
@@ -33,12 +37,14 @@ import org.apache.paimon.utils.Range;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
 import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static org.apache.paimon.partition.PartitionPredicate.fromPredicate;
+import static org.apache.paimon.table.SpecialFields.ROW_ID;
 import static org.apache.paimon.utils.Preconditions.checkState;
 
 /** Implementation for {@link ReadBuilder}. */
@@ -65,7 +71,7 @@ public class ReadBuilderImpl implements ReadBuilder {
 
     private @Nullable RowType readType;
     private @Nullable VariantAccessInfo[] variantAccessInfo;
-    private @Nullable List<Range> rowRanges;
+    public @Nullable @VisibleForTesting List<Range> rowRanges;
     private @Nullable VectorSearch vectorSearch;
 
     private boolean dropStats = false;
@@ -99,9 +105,55 @@ public class ReadBuilderImpl implements ReadBuilder {
         } else {
             this.filter = PredicateBuilder.and(this.filter, filter);
         }
+        calculateRowRanges(this.filter);
+        this.filter = removeRowIdFilter(this.filter);
         return this;
     }
 
+    private void calculateRowRanges(Predicate filter) {
+        if (filter == null) {
+            return;
+        }
+
+        RowIdPredicateVisitor visitor = new RowIdPredicateVisitor();
+        List<Range> ranges = filter.visit(visitor);
+        // When rowRanges is not null, filter data based on rowRanges.
+        // If rowRanges is empty, it means no data will be read.
+        if (ranges != null) {
+            withRowRanges(ranges);
+        }
+    }
+
+    private Predicate removeRowIdFilter(Predicate filter) {
+        if (filter == null) {
+            return null;
+        }
+
+        if (filter instanceof LeafPredicate
+                && ROW_ID.name().equals(((LeafPredicate) filter).fieldName())) 
{
+            return null;
+        } else if (filter instanceof CompoundPredicate) {
+            CompoundPredicate compoundPredicate = (CompoundPredicate) filter;
+
+            List<Predicate> newChildren = new ArrayList<>();
+            for (Predicate child : compoundPredicate.children()) {
+                Predicate newChild = removeRowIdFilter(child);
+                if (newChild != null) {
+                    newChildren.add(newChild);
+                }
+            }
+
+            if (newChildren.isEmpty()) {
+                return null;
+            } else if (newChildren.size() == 1) {
+                return newChildren.get(0);
+            } else {
+                return new CompoundPredicate(compoundPredicate.function(), 
newChildren);
+            }
+        }
+        return filter;
+    }
+
     @Override
     public ReadBuilder withPartitionFilter(Map<String, String> partitionSpec) {
         if (partitionSpec != null) {
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
index 806ce90e28..06b826b72a 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -18,17 +18,19 @@
 
 package org.apache.paimon.spark
 
+import org.apache.paimon.CoreOptions
 import org.apache.paimon.partition.PartitionPredicate
 import 
org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates
 import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate}
-import org.apache.paimon.table.Table
-import org.apache.paimon.types.RowType
+import org.apache.paimon.table.{InnerTable, Table}
+import org.apache.paimon.table.SpecialFields.ROW_ID
+import org.apache.paimon.types.{DataField, DataTypes, RowType}
 
 import org.apache.spark.sql.connector.read.{SupportsPushDownFilters, 
SupportsPushDownRequiredColumns}
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 
-import java.util.{List => JList}
+import java.util.{ArrayList, List => JList}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -41,6 +43,7 @@ abstract class PaimonBaseScanBuilder
   val table: Table
   val partitionKeys: JList[String] = table.partitionKeys()
   val rowType: RowType = table.rowType()
+  val coreOptions: CoreOptions = CoreOptions.fromMap(table.options())
 
   private var pushedSparkFilters = Array.empty[Filter]
   protected var hasPostScanPredicates = false
@@ -65,7 +68,14 @@ abstract class PaimonBaseScanBuilder
     val pushableDataFilters = mutable.ArrayBuffer.empty[Predicate]
     val postScan = mutable.ArrayBuffer.empty[Filter]
 
-    val converter = new SparkFilterConverter(rowType)
+    var newRowType = rowType
+    if (table.isInstanceOf[InnerTable] && coreOptions.rowIdPushDownEnabled()) {
+      val dataFieldsWithRowId = new ArrayList[DataField](rowType.getFields)
+      dataFieldsWithRowId.add(
+        new DataField(rowType.getFieldCount, ROW_ID.name(), 
DataTypes.BIGINT()))
+      newRowType = rowType.copy(dataFieldsWithRowId)
+    }
+    val converter = new SparkFilterConverter(newRowType)
     val partitionPredicateVisitor = new 
PartitionPredicateVisitor(partitionKeys)
     filters.foreach {
       filter =>
diff --git 
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
 
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
new file mode 100644
index 0000000000..da4c9b854d
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class RowIdPushDownTest extends RowIdPushDownTestBase {}
diff --git 
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
new file mode 100644
index 0000000000..da4c9b854d
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class RowIdPushDownTest extends RowIdPushDownTestBase {}
diff --git 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
new file mode 100644
index 0000000000..da4c9b854d
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class RowIdPushDownTest extends RowIdPushDownTestBase {}
diff --git 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
new file mode 100644
index 0000000000..da4c9b854d
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class RowIdPushDownTest extends RowIdPushDownTestBase {}
diff --git 
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
new file mode 100644
index 0000000000..da4c9b854d
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class RowIdPushDownTest extends RowIdPushDownTestBase {}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
index 71ab544b71..4ac5dd051b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -18,17 +18,19 @@
 
 package org.apache.paimon.spark
 
+import org.apache.paimon.CoreOptions
 import org.apache.paimon.partition.PartitionPredicate
 import 
org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates
 import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate, TopN}
-import org.apache.paimon.table.Table
-import org.apache.paimon.types.RowType
+import org.apache.paimon.table.{InnerTable, Table}
+import org.apache.paimon.table.SpecialFields.ROW_ID
+import org.apache.paimon.types.{DataField, DataTypes, RowType}
 
 import org.apache.spark.sql.connector.expressions.filter.{Predicate => 
SparkPredicate}
 import org.apache.spark.sql.connector.read.{SupportsPushDownLimit, 
SupportsPushDownRequiredColumns, SupportsPushDownV2Filters}
 import org.apache.spark.sql.types.StructType
 
-import java.util.{List => JList}
+import java.util.{ArrayList, List => JList}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -42,6 +44,7 @@ abstract class PaimonBaseScanBuilder
   val table: Table
   val partitionKeys: JList[String] = table.partitionKeys()
   val rowType: RowType = table.rowType()
+  val coreOptions: CoreOptions = CoreOptions.fromMap(table.options())
 
   private var pushedSparkPredicates = Array.empty[SparkPredicate]
   protected var hasPostScanPredicates = false
@@ -63,7 +66,14 @@ abstract class PaimonBaseScanBuilder
     val pushableDataFilters = mutable.ArrayBuffer.empty[Predicate]
     val postScan = mutable.ArrayBuffer.empty[SparkPredicate]
 
-    val converter = SparkV2FilterConverter(rowType)
+    var newRowType = rowType
+    if (table.isInstanceOf[InnerTable] && coreOptions.rowIdPushDownEnabled()) {
+      val dataFieldsWithRowId = new ArrayList[DataField](rowType.getFields)
+      dataFieldsWithRowId.add(
+        new DataField(rowType.getFieldCount, ROW_ID.name(), 
DataTypes.BIGINT()))
+      newRowType = rowType.copy(dataFieldsWithRowId)
+    }
+    val converter = SparkV2FilterConverter(newRowType)
     val partitionPredicateVisitor = new 
PartitionPredicateVisitor(partitionKeys)
     predicates.foreach {
       predicate =>
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala
new file mode 100644
index 0000000000..1e9f2d11b5
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.table.source.ReadBuilderImpl
+import org.apache.paimon.utils.Range
+
+import org.apache.spark.sql.Row
+
+import scala.collection.JavaConverters._
+
+class RowIdPushDownTestBase extends PaimonSparkTestBase {
+
+  test("test paimon-spark row id push down") {
+    withTable("t") {
+      sql("CREATE TABLE t (a INT, b INT, c STRING) TBLPROPERTIES " +
+        "('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 
'row-id-push-down.enabled'='true')")
+      sql("INSERT INTO t VALUES (1, 1, '1'), (2, 2, '2'), (3, 3, '3'), (4, 4, 
'4')")
+
+      // 1.LeafPredicate
+      assertResult(Seq(new Range(0L, 0L)))(
+        getPaimonScan("SELECT * FROM t WHERE _ROW_ID = 0").readBuilder
+          .asInstanceOf[ReadBuilderImpl]
+          .rowRanges
+          .asScala)
+      checkAnswer(
+        sql("SELECT * FROM t WHERE _ROW_ID = 0"),
+        Seq(Row(1, 1, "1"))
+      )
+      assertResult(Seq(new Range(0L, 1L), new Range(3L, 3L)))(
+        getPaimonScan("SELECT * FROM t WHERE _ROW_ID IN (0, 1, 3)").readBuilder
+          .asInstanceOf[ReadBuilderImpl]
+          .rowRanges
+          .asScala)
+      checkAnswer(
+        sql("SELECT * FROM t WHERE _ROW_ID IN (0, 1, 3)"),
+        Seq(Row(1, 1, "1"), Row(2, 2, "2"), Row(4, 4, "4"))
+      )
+      assertResult(Seq(new Range(4L, 5L)))(
+        getPaimonScan("SELECT * FROM t WHERE _ROW_ID IN (4, 5)").readBuilder
+          .asInstanceOf[ReadBuilderImpl]
+          .rowRanges
+          .asScala)
+      checkAnswer(
+        sql("SELECT * FROM t WHERE _ROW_ID IN (4, 5)"),
+        Seq()
+      )
+
+      // 2.CompoundPredicate
+      assertResult(Seq(new Range(0, 0)))(
+        getPaimonScan("SELECT * FROM t WHERE _ROW_ID = 0 AND _ROW_ID IN (0, 
1)").readBuilder
+          .asInstanceOf[ReadBuilderImpl]
+          .rowRanges
+          .asScala)
+      checkAnswer(
+        sql("SELECT * FROM t WHERE _ROW_ID = 0 AND _ROW_ID IN (0, 1)"),
+        Seq(Row(1, 1, "1"))
+      )
+      assertResult(Seq(new Range(0, 2)))(
+        getPaimonScan("SELECT * FROM t WHERE _ROW_ID = 0 OR _ROW_ID IN (1, 
2)").readBuilder
+          .asInstanceOf[ReadBuilderImpl]
+          .rowRanges
+          .asScala)
+      checkAnswer(
+        sql("SELECT * FROM t WHERE _ROW_ID = 0 OR _ROW_ID IN (1, 2)"),
+        Seq(Row(1, 1, "1"), Row(2, 2, "2"), Row(3, 3, "3"))
+      )
+      assertResult(Seq())(
+        getPaimonScan("SELECT * FROM t WHERE _ROW_ID = 0 AND _ROW_ID IN (1, 
2)").readBuilder
+          .asInstanceOf[ReadBuilderImpl]
+          .rowRanges
+          .asScala)
+      checkAnswer(
+        sql("SELECT * FROM t WHERE _ROW_ID = 0 AND _ROW_ID IN (1, 2)"),
+        Seq()
+      )
+    }
+  }
+}

Reply via email to