This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d139659c95 Revert "[flink] Flink batch job support specify partition 
with max_pt() and max_two_pt()" without review
d139659c95 is described below

commit d139659c956fe821eeefe8b4d816ac80bc361dcd
Author: yuzelin <[email protected]>
AuthorDate: Tue Dec 2 22:19:00 2025 +0800

    Revert "[flink] Flink batch job support specify partition with max_pt() and 
max_two_pt()" without review
    
    This reverts commit 0d6db479f4879599b2da4fa8cbfad413276720bc.
---
 .../flink/lookup/DynamicPartitionLevelLoader.java  | 13 ++--
 .../flink/lookup/DynamicPartitionLoader.java       |  6 +-
 .../flink/lookup/DynamicPartitionNumberLoader.java |  8 +--
 .../flink/lookup/FileStoreLookupFunction.java      |  4 +-
 .../paimon/flink/lookup/PartitionLoader.java       | 14 ++---
 .../paimon/flink/lookup/StaticPartitionLoader.java | 13 ++--
 .../paimon/flink/source/FlinkTableSource.java      | 70 ++++++++++------------
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 40 -------------
 8 files changed, 60 insertions(+), 108 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoader.java
index 88920c05fc..e4ffe2a5e8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoader.java
@@ -20,8 +20,7 @@ package org.apache.paimon.flink.lookup;
 
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.table.Table;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Preconditions;
 
@@ -35,8 +34,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
-
 /** Dynamic partition loader which can specify the partition level to load for 
lookup. */
 public class DynamicPartitionLevelLoader extends DynamicPartitionLoader {
 
@@ -50,12 +47,14 @@ public class DynamicPartitionLevelLoader extends 
DynamicPartitionLoader {
     private final String defaultPartitionName;
 
     DynamicPartitionLevelLoader(
-            Table table, Duration refreshInterval, Map<String, String> 
partitionLoadConfig) {
+            FileStoreTable table,
+            Duration refreshInterval,
+            Map<String, String> partitionLoadConfig) {
         super(table, refreshInterval);
         maxPartitionLoadLevel =
                 getMaxPartitionLoadLevel(partitionLoadConfig, 
table.partitionKeys());
         fieldGetters = createPartitionFieldGetters();
-        defaultPartitionName = 
Options.fromMap(table.options()).get(PARTITION_DEFAULT_NAME);
+        defaultPartitionName = table.coreOptions().partitionDefaultName();
 
         LOG.info(
                 "Init 
DynamicPartitionLevelLoader(table={}),maxPartitionLoadLevel is {}",
@@ -64,7 +63,7 @@ public class DynamicPartitionLevelLoader extends 
DynamicPartitionLoader {
     }
 
     @Override
-    public List<BinaryRow> getMaxPartitions() {
+    protected List<BinaryRow> getMaxPartitions() {
         List<BinaryRow> newPartitions =
                 table.newReadBuilder().newScan().listPartitions().stream()
                         .sorted(comparator.reversed())
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
index 37671dee0f..201997da55 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
@@ -21,7 +21,7 @@ package org.apache.paimon.flink.lookup;
 import org.apache.paimon.codegen.CodeGenUtils;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.table.Table;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
 
@@ -46,7 +46,7 @@ public abstract class DynamicPartitionLoader extends 
PartitionLoader {
     protected transient Comparator<InternalRow> comparator;
     protected transient LocalDateTime lastRefresh;
 
-    DynamicPartitionLoader(Table table, Duration refreshInterval) {
+    DynamicPartitionLoader(FileStoreTable table, Duration refreshInterval) {
         super(table);
         this.refreshInterval = refreshInterval;
     }
@@ -92,7 +92,7 @@ public abstract class DynamicPartitionLoader extends 
PartitionLoader {
         }
     }
 
-    public abstract List<BinaryRow> getMaxPartitions();
+    protected abstract List<BinaryRow> getMaxPartitions();
 
     private void logNewPartitions() {
         String partitionsStr = partitionsToString(partitions);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionNumberLoader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionNumberLoader.java
index 7e210725de..52c1d4c76c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionNumberLoader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionNumberLoader.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.flink.lookup;
 
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.table.Table;
+import org.apache.paimon.table.FileStoreTable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,7 +37,8 @@ public class DynamicPartitionNumberLoader extends 
DynamicPartitionLoader {
 
     private final int maxPartitionNum;
 
-    DynamicPartitionNumberLoader(Table table, Duration refreshInterval, int 
maxPartitionNum) {
+    DynamicPartitionNumberLoader(
+            FileStoreTable table, Duration refreshInterval, int 
maxPartitionNum) {
         super(table, refreshInterval);
         this.maxPartitionNum = maxPartitionNum;
         LOG.info(
@@ -46,8 +47,7 @@ public class DynamicPartitionNumberLoader extends 
DynamicPartitionLoader {
                 maxPartitionNum);
     }
 
-    @Override
-    public List<BinaryRow> getMaxPartitions() {
+    protected List<BinaryRow> getMaxPartitions() {
         List<BinaryRow> newPartitions =
                 table.newReadBuilder().newScan().listPartitions().stream()
                         .sorted(comparator.reversed())
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 121a86d782..7df4742101 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -227,7 +227,7 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
             List<BinaryRow> partitions = partitionLoader.partitions();
             if (!partitions.isEmpty()) {
                 lookupTable.specifyPartitions(
-                        partitions, 
partitionLoader.createSpecificPartFilter(partitions));
+                        partitions, 
partitionLoader.createSpecificPartFilter());
             }
         }
 
@@ -329,7 +329,7 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
             if (partitionChanged) {
                 // reopen with latest partition
                 lookupTable.specifyPartitions(
-                        partitions, 
partitionLoader.createSpecificPartFilter(partitions));
+                        partitionLoader.partitions(), 
partitionLoader.createSpecificPartFilter());
                 lookupTable.close();
                 lookupTable.open();
                 // no need to refresh the lookup table because it is reopened
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionLoader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionLoader.java
index 72f7777f01..701c40b67c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionLoader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionLoader.java
@@ -24,7 +24,7 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.table.Table;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ParameterUtils;
 import org.apache.paimon.utils.Preconditions;
@@ -44,15 +44,15 @@ public abstract class PartitionLoader implements 
Serializable {
 
     private static final long serialVersionUID = 1L;
 
-    private static final String MAX_PT = "max_pt()";
-    private static final String MAX_TWO_PT = "max_two_pt()";
+    protected static final String MAX_PT = "max_pt()";
+    protected static final String MAX_TWO_PT = "max_two_pt()";
 
-    protected final Table table;
+    protected final FileStoreTable table;
     private final RowDataToObjectArrayConverter partitionConverter;
 
     protected transient List<BinaryRow> partitions;
 
-    protected PartitionLoader(Table table) {
+    protected PartitionLoader(FileStoreTable table) {
         this.table = table;
         this.partitionConverter =
                 new 
RowDataToObjectArrayConverter(table.rowType().project(table.partitionKeys()));
@@ -76,7 +76,7 @@ public abstract class PartitionLoader implements Serializable 
{
         partitionKeys.stream().filter(k -> 
!projectFields.contains(k)).forEach(projectFields::add);
     }
 
-    public Predicate createSpecificPartFilter(List<BinaryRow> partitions) {
+    public Predicate createSpecificPartFilter() {
         Predicate partFilter = null;
         for (BinaryRow partition : partitions) {
             if (partFilter == null) {
@@ -105,7 +105,7 @@ public abstract class PartitionLoader implements 
Serializable {
     public abstract boolean checkRefresh();
 
     @Nullable
-    public static PartitionLoader of(Table table) {
+    public static PartitionLoader of(FileStoreTable table) {
         Options options = Options.fromMap(table.options());
         String scanPartitions = 
options.get(FlinkConnectorOptions.SCAN_PARTITIONS);
         if (scanPartitions == null) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/StaticPartitionLoader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/StaticPartitionLoader.java
index bdfecc39f9..b6387a20a5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/StaticPartitionLoader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/StaticPartitionLoader.java
@@ -20,8 +20,7 @@ package org.apache.paimon.flink.lookup;
 
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.table.Table;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
 
@@ -29,14 +28,13 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
-
 /** {@link PartitionLoader} for specified static partitions. */
 public class StaticPartitionLoader extends PartitionLoader {
 
     private final List<Map<String, String>> scanPartitions;
 
-    protected StaticPartitionLoader(Table table, List<Map<String, String>> 
scanPartitions) {
+    protected StaticPartitionLoader(
+            FileStoreTable table, List<Map<String, String>> scanPartitions) {
         super(table);
         this.scanPartitions = scanPartitions;
     }
@@ -44,13 +42,12 @@ public class StaticPartitionLoader extends PartitionLoader {
     @Override
     public void open() {
         partitions = new ArrayList<>();
-        RowType partitionType = table.rowType().project(table.partitionKeys());
-        String defaultPartitionName = 
Options.fromMap(table.options()).get(PARTITION_DEFAULT_NAME);
+        RowType partitionType = table.schema().logicalPartitionType();
         InternalRowSerializer serializer = new 
InternalRowSerializer(partitionType);
         for (Map<String, String> spec : scanPartitions) {
             GenericRow row =
                     InternalRowPartitionComputer.convertSpecToInternalRow(
-                            spec, partitionType, defaultPartitionName);
+                            spec, partitionType, 
table.coreOptions().partitionDefaultName());
             partitions.add(serializer.toBinaryRow(row).copy());
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
index 2ee8f235d5..d0e79e2a05 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
@@ -19,13 +19,9 @@
 package org.apache.paimon.flink.source;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.LogicalTypeConversion;
 import org.apache.paimon.flink.PredicateConverter;
-import org.apache.paimon.flink.lookup.DynamicPartitionLoader;
-import org.apache.paimon.flink.lookup.PartitionLoader;
-import org.apache.paimon.flink.lookup.StaticPartitionLoader;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.PartitionPredicate;
@@ -34,9 +30,9 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.predicate.PredicateVisitor;
 import org.apache.paimon.table.DataTable;
-import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.Split;
+import org.apache.paimon.utils.ParameterUtils;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -56,7 +52,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 
-import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PARTITIONS;
 import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;
 import static 
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
 
@@ -142,42 +137,43 @@ public abstract class FlinkTableSource
 
     /**
      * This method is only used for normal source (not lookup source). 
Specified partitions in
-     * lookup sources are handled in {@link 
org.apache.paimon.flink.lookup.PartitionLoader}. But we
-     * use PartitionLoader to create partition predicate. It's possible that 
user use max_pt() or
-     * max_two_pt() in batch join, so we should also handle it.
+     * lookup sources are handled in {@link 
org.apache.paimon.flink.lookup.PartitionLoader}.
      */
     private PartitionPredicate getPartitionPredicateWithOptions() {
-        PartitionLoader partitionLoader = PartitionLoader.of((FileStoreTable) 
table);
-        if (partitionLoader == null) {
-            return null;
-        }
+        if (options.contains(FlinkConnectorOptions.SCAN_PARTITIONS)) {
+            try {
+                Predicate predicate =
+                        PartitionPredicate.createPartitionPredicate(
+                                ParameterUtils.getPartitions(
+                                        
options.get(FlinkConnectorOptions.SCAN_PARTITIONS)
+                                                .split(";")),
+                                table.rowType(),
+                                
options.get(CoreOptions.PARTITION_DEFAULT_NAME));
+                // Partition filter will be used to filter Manifest stats, the 
stats schema is
+                // partition type. See SnapshotReaderImpl#withFilter
+                Predicate transformed =
+                        transformFieldMapping(
+                                        predicate,
+                                        
PredicateBuilder.fieldIdxToPartitionIdx(
+                                                table.rowType(), 
table.partitionKeys()))
+                                .orElseThrow(
+                                        () ->
+                                                new RuntimeException(
+                                                        "Failed to transform 
the partition predicate "
+                                                                + predicate));
+                return PartitionPredicate.fromPredicate(
+                        table.rowType().project(table.partitionKeys()), 
transformed);
+            } catch (IllegalArgumentException e) {
+                // In older versions of Flink, however, lookup sources will 
first be treated as
+                // normal sources. So this method will also be visited by 
lookup tables, whose
+                // option value might be max_pt() or max_two_pt(). In this 
case we ignore the
+                // filters.
+                return null;
+            }
 
-        partitionLoader.open();
-        List<BinaryRow> partitions;
-        if (partitionLoader instanceof StaticPartitionLoader) {
-            partitions = partitionLoader.partitions();
-        } else if (partitionLoader instanceof DynamicPartitionLoader) {
-            partitions = ((DynamicPartitionLoader) 
partitionLoader).getMaxPartitions();
         } else {
-            throw new RuntimeException(
-                    "Failed to handle scan.partitions = " + 
options.get(SCAN_PARTITIONS));
+            return null;
         }
-
-        // Partition filter will be used to filter Manifest stats, the stats 
schema is
-        // partition type. See SnapshotReaderImpl#withFilter
-        Predicate predicate = 
partitionLoader.createSpecificPartFilter(partitions);
-        Predicate transformed =
-                transformFieldMapping(
-                                predicate,
-                                PredicateBuilder.fieldIdxToPartitionIdx(
-                                        table.rowType(), 
table.partitionKeys()))
-                        .orElseThrow(
-                                () ->
-                                        new RuntimeException(
-                                                "Failed to transform the 
partition predicate "
-                                                        + predicate));
-        return PartitionPredicate.fromPredicate(
-                table.rowType().project(table.partitionKeys()), transformed);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index a81cf7a506..589eeb14af 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -902,46 +902,6 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
         assertThat(sql(query)).containsExactly(Row.of(1, 11), Row.of(1, 12), 
Row.of(2, 22));
     }
 
-    @Test
-    public void testScanWithSpecifiedPartitionsWithMaxPt() {
-        sql("CREATE TABLE P (id INT, v INT, pt STRING) PARTITIONED BY (pt)");
-        sql("CREATE TABLE Q (id INT)");
-        sql(
-                "INSERT INTO P VALUES (1, 10, 'a'), (2, 20, 'a'), (1, 11, 
'b'), (3, 31, 'b'), (1, 12, 'c'), (2, 22, 'c'), (3, 32, 'c')");
-        sql("INSERT INTO Q VALUES (1), (2), (3)");
-        String query =
-                "SELECT Q.id, P.v FROM Q INNER JOIN P /*+ 
OPTIONS('scan.partitions' = 'max_pt()') */ ON Q.id = P.id ORDER BY Q.id, P.v";
-        assertThat(sql(query)).containsExactly(Row.of(1, 12), Row.of(2, 22), 
Row.of(3, 32));
-    }
-
-    @Test
-    public void testScanWithSpecifiedPartitionsWithMaxTwoPt() {
-        sql("CREATE TABLE P (id INT, v INT, pt STRING) PARTITIONED BY (pt)");
-        sql("CREATE TABLE Q (id INT)");
-        sql(
-                "INSERT INTO P VALUES (1, 10, 'a'), (2, 20, 'a'), (1, 11, 
'b'), (3, 31, 'b'), (1, 12, 'c'), (2, 22, 'c'), (3, 32, 'c')");
-        sql("INSERT INTO Q VALUES (1), (2), (3)");
-        String query =
-                "SELECT Q.id, P.v FROM Q INNER JOIN P /*+ 
OPTIONS('scan.partitions' = 'max_two_pt()') */ ON Q.id = P.id ORDER BY Q.id, 
P.v";
-        assertThat(sql(query))
-                .containsExactly(
-                        Row.of(1, 11), Row.of(1, 12), Row.of(2, 22), Row.of(3, 
31), Row.of(3, 32));
-    }
-
-    @Test
-    public void testScanWithSpecifiedPartitionsWithLevelMaxPt() throws 
Exception {
-        sql(
-                "CREATE TABLE P (id INT, v INT, pt1 STRING, pt2 STRING, pt3 
STRING) PARTITIONED BY (pt1, pt2, pt3)");
-        sql("CREATE TABLE Q (id INT)");
-        sql(
-                "INSERT INTO P VALUES (1, 10, 'a', '2025-10-01', '1'), (2, 20, 
'a', '2025-10-01', '2'), (3, 30, 'a', '2025-10-02', '1'), (4, 40, 'a', 
'2025-10-02', '2'), "
-                        + "(1, 11, 'b', '2025-10-01', '1'), (2, 21, 'b', 
'2025-10-01', '2'), (3, 31, 'b', '2025-10-02', '1'), (4, 41, 'b', '2025-10-02', 
'2')");
-        sql("INSERT INTO Q VALUES (1), (2), (3), (4)");
-        String query =
-                "SELECT Q.id, P.v FROM Q INNER JOIN P /*+ 
OPTIONS('scan.partitions' = 'pt1=max_pt(),pt2=max_pt()') */ ON Q.id = P.id 
ORDER BY Q.id, P.v";
-        assertThat(sql(query)).containsExactly(Row.of(3, 31), Row.of(4, 41));
-    }
-
     @Test
     public void testEmptyTableIncrementalBetweenTimestamp() {
         assertThat(sql("SELECT * FROM T /*+ 
OPTIONS('incremental-between-timestamp'='0,1') */"))

Reply via email to