This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2833e3226a [core] Fix that partition filter wasn't transformed by 
field mapping when scan (#6054)
2833e3226a is described below

commit 2833e3226afc6bb6b4cf4aa282a7d7f833e04819
Author: yuzelin <[email protected]>
AuthorDate: Mon Aug 11 16:52:40 2025 +0800

    [core] Fix that partition filter wasn't transformed by field mapping when 
scan (#6054)
---
 .../apache/paimon/predicate/PredicateBuilder.java  |  4 +++
 .../table/source/snapshot/SnapshotReaderImpl.java  |  6 ++--
 .../paimon/flink/source/FlinkTableSource.java      | 34 +++++++++++++++-------
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 12 ++++++++
 4 files changed, 41 insertions(+), 15 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
index 32df92601b..63c5dc2206 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
@@ -419,4 +419,8 @@ public class PredicateBuilder {
                         .map(p -> PredicateBuilder.partition(p, rowType, 
defaultPartValue))
                         .toArray(Predicate[]::new));
     }
+
+    public static int[] fieldIdxToPartitionIdx(RowType tableType, List<String> 
partitionKeys) {
+        return 
tableType.getFieldNames().stream().mapToInt(partitionKeys::indexOf).toArray();
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index aaa1e9e635..858d75ceca 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -218,11 +218,9 @@ public class SnapshotReaderImpl implements SnapshotReader {
 
     @Override
     public SnapshotReader withFilter(Predicate predicate) {
-        List<String> partitionKeys = tableSchema.partitionKeys();
         int[] fieldIdxToPartitionIdx =
-                tableSchema.fields().stream()
-                        .mapToInt(f -> partitionKeys.indexOf(f.name()))
-                        .toArray();
+                PredicateBuilder.fieldIdxToPartitionIdx(
+                        tableSchema.logicalRowType(), 
tableSchema.partitionKeys());
 
         List<Predicate> partitionFilters = new ArrayList<>();
         List<Predicate> nonPartitionFilters = new ArrayList<>();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
index 621ceb7357..d0e79e2a05 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
@@ -53,6 +53,7 @@ import java.util.List;
 import java.util.Optional;
 
 import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;
+import static 
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
 
 /** A Flink {@link ScanTableSource} for paimon. */
 public abstract class FlinkTableSource
@@ -71,6 +72,7 @@ public abstract class FlinkTableSource
     protected final Options options;
 
     @Nullable protected Predicate predicate;
+
     /**
      * This field is only used for normal source (not lookup source). 
Specified partitions in lookup
      * sources are handled in {@link 
org.apache.paimon.flink.lookup.PartitionLoader}.
@@ -139,18 +141,28 @@ public abstract class FlinkTableSource
      */
     private PartitionPredicate getPartitionPredicateWithOptions() {
         if (options.contains(FlinkConnectorOptions.SCAN_PARTITIONS)) {
-            PartitionPredicate partitionPredicate;
             try {
-                partitionPredicate =
-                        PartitionPredicate.fromPredicate(
-                                table.rowType().project(table.partitionKeys()),
-                                PartitionPredicate.createPartitionPredicate(
-                                        ParameterUtils.getPartitions(
-                                                
options.get(FlinkConnectorOptions.SCAN_PARTITIONS)
-                                                        .split(";")),
-                                        table.rowType(),
-                                        
options.get(CoreOptions.PARTITION_DEFAULT_NAME)));
-                return partitionPredicate;
+                Predicate predicate =
+                        PartitionPredicate.createPartitionPredicate(
+                                ParameterUtils.getPartitions(
+                                        
options.get(FlinkConnectorOptions.SCAN_PARTITIONS)
+                                                .split(";")),
+                                table.rowType(),
+                                
options.get(CoreOptions.PARTITION_DEFAULT_NAME));
+                // Partition filter will be used to filter Manifest stats, the 
stats schema is
+                // partition type. See SnapshotReaderImpl#withFilter
+                Predicate transformed =
+                        transformFieldMapping(
+                                        predicate,
+                                        
PredicateBuilder.fieldIdxToPartitionIdx(
+                                                table.rowType(), 
table.partitionKeys()))
+                                .orElseThrow(
+                                        () ->
+                                                new RuntimeException(
+                                                        "Failed to transform 
the partition predicate "
+                                                                + predicate));
+                return PartitionPredicate.fromPredicate(
+                        table.rowType().project(table.partitionKeys()), 
transformed);
             } catch (IllegalArgumentException e) {
                 // In older versions of Flink, however, lookup sources will 
first be treated as
                 // normal sources. So this method will also be visited by 
lookup tables, whose
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index ac57e7d874..af16f0a6ed 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -829,6 +829,18 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
         assertThat(sql(query)).containsExactly(Row.of(1, 11), Row.of(1, 12), 
Row.of(2, 22));
     }
 
+    @Test
+    public void testScanWithSpecifiedPartitionsWithFieldMapping() {
+        sql("CREATE TABLE P (id INT, v INT, pt STRING) PARTITIONED BY (pt)");
+        sql("CREATE TABLE Q (id INT)");
+        sql(
+                "INSERT INTO P VALUES (1, 10, 'a'), (2, 20, 'a'), (1, 11, 
'b'), (3, 31, 'b'), (1, 12, 'c'), (2, 22, 'c'), (3, 32, 'c')");
+        sql("INSERT INTO Q VALUES (1), (2)");
+        String query =
+                "SELECT Q.id, P.v FROM Q INNER JOIN P /*+ 
OPTIONS('scan.partitions' = 'pt=b;pt=c') */ ON Q.id = P.id ORDER BY Q.id, P.v";
+        assertThat(sql(query)).containsExactly(Row.of(1, 11), Row.of(1, 12), 
Row.of(2, 22));
+    }
+
     @Test
     public void testEmptyTableIncrementalBetweenTimestamp() {
         assertThat(sql("SELECT * FROM T /*+ 
OPTIONS('incremental-between-timestamp'='0,1') */"))

Reply via email to