This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2833e3226a [core] Fix that partition filter wasn't transformed by
field mapping when scan (#6054)
2833e3226a is described below
commit 2833e3226afc6bb6b4cf4aa282a7d7f833e04819
Author: yuzelin <[email protected]>
AuthorDate: Mon Aug 11 16:52:40 2025 +0800
[core] Fix that partition filter wasn't transformed by field mapping when
scan (#6054)
---
.../apache/paimon/predicate/PredicateBuilder.java | 4 +++
.../table/source/snapshot/SnapshotReaderImpl.java | 6 ++--
.../paimon/flink/source/FlinkTableSource.java | 34 +++++++++++++++-------
.../apache/paimon/flink/BatchFileStoreITCase.java | 12 ++++++++
4 files changed, 41 insertions(+), 15 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
index 32df92601b..63c5dc2206 100644
---
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
+++
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
@@ -419,4 +419,8 @@ public class PredicateBuilder {
.map(p -> PredicateBuilder.partition(p, rowType,
defaultPartValue))
.toArray(Predicate[]::new));
}
+
+ public static int[] fieldIdxToPartitionIdx(RowType tableType, List<String>
partitionKeys) {
+ return
tableType.getFieldNames().stream().mapToInt(partitionKeys::indexOf).toArray();
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index aaa1e9e635..858d75ceca 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -218,11 +218,9 @@ public class SnapshotReaderImpl implements SnapshotReader {
@Override
public SnapshotReader withFilter(Predicate predicate) {
- List<String> partitionKeys = tableSchema.partitionKeys();
int[] fieldIdxToPartitionIdx =
- tableSchema.fields().stream()
- .mapToInt(f -> partitionKeys.indexOf(f.name()))
- .toArray();
+ PredicateBuilder.fieldIdxToPartitionIdx(
+ tableSchema.logicalRowType(),
tableSchema.partitionKeys());
List<Predicate> partitionFilters = new ArrayList<>();
List<Predicate> nonPartitionFilters = new ArrayList<>();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
index 621ceb7357..d0e79e2a05 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
@@ -53,6 +53,7 @@ import java.util.List;
import java.util.Optional;
import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;
+import static
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
/** A Flink {@link ScanTableSource} for paimon. */
public abstract class FlinkTableSource
@@ -71,6 +72,7 @@ public abstract class FlinkTableSource
protected final Options options;
@Nullable protected Predicate predicate;
+
/**
* This field is only used for normal source (not lookup source).
Specified partitions in lookup
* sources are handled in {@link
org.apache.paimon.flink.lookup.PartitionLoader}.
@@ -139,18 +141,28 @@ public abstract class FlinkTableSource
*/
private PartitionPredicate getPartitionPredicateWithOptions() {
if (options.contains(FlinkConnectorOptions.SCAN_PARTITIONS)) {
- PartitionPredicate partitionPredicate;
try {
- partitionPredicate =
- PartitionPredicate.fromPredicate(
- table.rowType().project(table.partitionKeys()),
- PartitionPredicate.createPartitionPredicate(
- ParameterUtils.getPartitions(
-
options.get(FlinkConnectorOptions.SCAN_PARTITIONS)
- .split(";")),
- table.rowType(),
-
options.get(CoreOptions.PARTITION_DEFAULT_NAME)));
- return partitionPredicate;
+ Predicate predicate =
+ PartitionPredicate.createPartitionPredicate(
+ ParameterUtils.getPartitions(
+
options.get(FlinkConnectorOptions.SCAN_PARTITIONS)
+ .split(";")),
+ table.rowType(),
+
options.get(CoreOptions.PARTITION_DEFAULT_NAME));
+ // Partition filter will be used to filter Manifest stats, the
stats schema is
+ // partition type. See SnapshotReaderImpl#withFilter
+ Predicate transformed =
+ transformFieldMapping(
+ predicate,
+
PredicateBuilder.fieldIdxToPartitionIdx(
+ table.rowType(),
table.partitionKeys()))
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ "Failed to transform
the partition predicate "
+ + predicate));
+ return PartitionPredicate.fromPredicate(
+ table.rowType().project(table.partitionKeys()),
transformed);
} catch (IllegalArgumentException e) {
// In older versions of Flink, however, lookup sources will
first be treated as
// normal sources. So this method will also be visited by
lookup tables, whose
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index ac57e7d874..af16f0a6ed 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -829,6 +829,18 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
assertThat(sql(query)).containsExactly(Row.of(1, 11), Row.of(1, 12),
Row.of(2, 22));
}
+ @Test
+ public void testScanWithSpecifiedPartitionsWithFieldMapping() {
+ sql("CREATE TABLE P (id INT, v INT, pt STRING) PARTITIONED BY (pt)");
+ sql("CREATE TABLE Q (id INT)");
+ sql(
+ "INSERT INTO P VALUES (1, 10, 'a'), (2, 20, 'a'), (1, 11,
'b'), (3, 31, 'b'), (1, 12, 'c'), (2, 22, 'c'), (3, 32, 'c')");
+ sql("INSERT INTO Q VALUES (1), (2)");
+ String query =
+ "SELECT Q.id, P.v FROM Q INNER JOIN P /*+
OPTIONS('scan.partitions' = 'pt=b;pt=c') */ ON Q.id = P.id ORDER BY Q.id, P.v";
+ assertThat(sql(query)).containsExactly(Row.of(1, 11), Row.of(1, 12),
Row.of(2, 22));
+ }
+
@Test
public void testEmptyTableIncrementalBetweenTimestamp() {
assertThat(sql("SELECT * FROM T /*+
OPTIONS('incremental-between-timestamp'='0,1') */"))