This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 30d8c1cf5d [flink] Flink batch job support specify partition with 
max_pt() and max_two_pt() (#6728)
30d8c1cf5d is described below

commit 30d8c1cf5d50e9d44431ca4eb4b352821097d0b5
Author: yuzelin <[email protected]>
AuthorDate: Mon Dec 15 13:46:32 2025 +0800

    [flink] Flink batch job support specify partition with max_pt() and 
max_two_pt() (#6728)
---
 .../generated/flink_connector_configuration.html   |  2 +-
 .../paimon/partition/PartitionPredicate.java       | 35 +++++++----
 .../apache/paimon/flink/FlinkConnectorOptions.java |  7 ++-
 .../flink/lookup/DynamicPartitionLevelLoader.java  | 13 ++--
 .../flink/lookup/DynamicPartitionLoader.java       |  6 +-
 .../flink/lookup/DynamicPartitionNumberLoader.java |  8 +--
 .../paimon/flink/lookup/PartitionLoader.java       | 39 +++---------
 .../paimon/flink/lookup/StaticPartitionLoader.java | 13 ++--
 .../paimon/flink/source/FlinkTableSource.java      | 73 ++++++++++++----------
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 46 ++++++++++++++
 10 files changed, 145 insertions(+), 97 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 30aa9dea29..0549fde466 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -180,7 +180,7 @@ under the License.
             <td><h5>scan.partitions</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
-            <td>Specify the partitions to scan. Partitions should be given in 
the form of key1=value1,key2=value2. Partition keys not specified will be 
filled with the value of partition.default-name. Multiple partitions should be 
separated by semicolon (;). This option can support normal source tables and 
lookup join tables. For lookup joins, two special values max_pt() and 
max_two_pt() are also supported, specifying the (two) partition(s) with the 
largest partition value.</td>
+            <td>Specify the partitions to scan. Partitions should be given in 
the form of key1=value1,key2=value2. Partition keys not specified will be 
filled with the value of partition.default-name. Multiple partitions should be 
separated by semicolon (;). This option can support normal source tables and 
lookup join tables. There are two special values max_pt() and max_two_pt() are 
also supported to specify the (two) partition(s) with the largest partition 
value. For lookup source, the [...]
         </tr>
         <tr>
             <td><h5>scan.remove-normalize</h5></td>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
index 1f041f5153..cb4e952664 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
@@ -273,25 +273,34 @@ public interface PartitionPredicate extends Serializable {
         return predicate;
     }
 
-    static Predicate createPartitionPredicate(RowType partitionType, Object[] 
partition) {
-        Preconditions.checkArgument(
-                partition.length == partitionType.getFieldCount(),
-                "Partition's field count should be equal to partitionType's 
field count.");
-
-        Map<String, Object> partitionMap = new HashMap<>(partition.length);
-        for (int i = 0; i < partition.length; i++) {
-            partitionMap.put(partitionType.getFields().get(i).name(), 
partition[i]);
+    static Predicate createPartitionPredicate(
+            RowType rowType, RowDataToObjectArrayConverter converter, 
List<BinaryRow> partitions) {
+        Predicate partFilter = null;
+        for (BinaryRow partition : partitions) {
+            if (partFilter == null) {
+                partFilter = createSinglePartitionPredicate(rowType, 
converter, partition);
+            } else {
+                partFilter =
+                        PredicateBuilder.or(
+                                partFilter,
+                                createSinglePartitionPredicate(rowType, 
converter, partition));
+            }
         }
-
-        return createPartitionPredicate(partitionType, partitionMap);
+        return partFilter;
     }
 
-    static Predicate createPartitionPredicate(RowType partitionType, BinaryRow 
partition) {
+    static Predicate createSinglePartitionPredicate(
+            RowType rowType, RowDataToObjectArrayConverter converter, 
BinaryRow partition) {
+        RowType partitionType = converter.rowType();
         Preconditions.checkArgument(
                 partition.getFieldCount() == partitionType.getFieldCount(),
                 "Partition's field count should be equal to partitionType's 
field count.");
-        RowDataToObjectArrayConverter converter = new 
RowDataToObjectArrayConverter(partitionType);
-        return createPartitionPredicate(partitionType, 
converter.convert(partition));
+        Object[] partitionSpec = converter.convert(partition);
+        Map<String, Object> partitionMap = new HashMap<>(partitionSpec.length);
+        for (int i = 0; i < partitionSpec.length; i++) {
+            partitionMap.put(partitionType.getFields().get(i).name(), 
partitionSpec[i]);
+        }
+        return createPartitionPredicate(rowType, partitionMap);
     }
 
     @Nullable
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index eaf959b477..a1089d991a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -302,8 +302,11 @@ public class FlinkConnectorOptions {
                                     + CoreOptions.PARTITION_DEFAULT_NAME.key()
                                     + ". Multiple partitions should be 
separated by semicolon (;). "
                                     + "This option can support normal source 
tables and lookup join tables. "
-                                    + "For lookup joins, two special values 
max_pt() and max_two_pt() are also supported, "
-                                    + "specifying the (two) partition(s) with 
the largest partition value.");
+                                    + "There are two special values max_pt() 
and max_two_pt() are also supported "
+                                    + "to specify the (two) partition(s) with 
the largest partition value. For "
+                                    + "lookup source, the max partition(s) 
will be periodically refreshed; for "
+                                    + "normal source, the max partition(s) 
will be determined before job running "
+                                    + "without refreshing even for streaming 
jobs.");
 
     public static final ConfigOption<Duration> 
LOOKUP_DYNAMIC_PARTITION_REFRESH_INTERVAL =
             ConfigOptions.key("lookup.dynamic-partition.refresh-interval")
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoader.java
index e4ffe2a5e8..88920c05fc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoader.java
@@ -20,7 +20,8 @@ package org.apache.paimon.flink.lookup;
 
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Preconditions;
 
@@ -34,6 +35,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
+
 /** Dynamic partition loader which can specify the partition level to load for 
lookup. */
 public class DynamicPartitionLevelLoader extends DynamicPartitionLoader {
 
@@ -47,14 +50,12 @@ public class DynamicPartitionLevelLoader extends 
DynamicPartitionLoader {
     private final String defaultPartitionName;
 
     DynamicPartitionLevelLoader(
-            FileStoreTable table,
-            Duration refreshInterval,
-            Map<String, String> partitionLoadConfig) {
+            Table table, Duration refreshInterval, Map<String, String> 
partitionLoadConfig) {
         super(table, refreshInterval);
         maxPartitionLoadLevel =
                 getMaxPartitionLoadLevel(partitionLoadConfig, 
table.partitionKeys());
         fieldGetters = createPartitionFieldGetters();
-        defaultPartitionName = table.coreOptions().partitionDefaultName();
+        defaultPartitionName = 
Options.fromMap(table.options()).get(PARTITION_DEFAULT_NAME);
 
         LOG.info(
                 "Init 
DynamicPartitionLevelLoader(table={}),maxPartitionLoadLevel is {}",
@@ -63,7 +64,7 @@ public class DynamicPartitionLevelLoader extends 
DynamicPartitionLoader {
     }
 
     @Override
-    protected List<BinaryRow> getMaxPartitions() {
+    public List<BinaryRow> getMaxPartitions() {
         List<BinaryRow> newPartitions =
                 table.newReadBuilder().newScan().listPartitions().stream()
                         .sorted(comparator.reversed())
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
index 201997da55..37671dee0f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
@@ -21,7 +21,7 @@ package org.apache.paimon.flink.lookup;
 import org.apache.paimon.codegen.CodeGenUtils;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
 
@@ -46,7 +46,7 @@ public abstract class DynamicPartitionLoader extends 
PartitionLoader {
     protected transient Comparator<InternalRow> comparator;
     protected transient LocalDateTime lastRefresh;
 
-    DynamicPartitionLoader(FileStoreTable table, Duration refreshInterval) {
+    DynamicPartitionLoader(Table table, Duration refreshInterval) {
         super(table);
         this.refreshInterval = refreshInterval;
     }
@@ -92,7 +92,7 @@ public abstract class DynamicPartitionLoader extends 
PartitionLoader {
         }
     }
 
-    protected abstract List<BinaryRow> getMaxPartitions();
+    public abstract List<BinaryRow> getMaxPartitions();
 
     private void logNewPartitions() {
         String partitionsStr = partitionsToString(partitions);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionNumberLoader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionNumberLoader.java
index 52c1d4c76c..7e210725de 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionNumberLoader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionNumberLoader.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.flink.lookup;
 
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,8 +37,7 @@ public class DynamicPartitionNumberLoader extends 
DynamicPartitionLoader {
 
     private final int maxPartitionNum;
 
-    DynamicPartitionNumberLoader(
-            FileStoreTable table, Duration refreshInterval, int 
maxPartitionNum) {
+    DynamicPartitionNumberLoader(Table table, Duration refreshInterval, int 
maxPartitionNum) {
         super(table, refreshInterval);
         this.maxPartitionNum = maxPartitionNum;
         LOG.info(
@@ -47,7 +46,8 @@ public class DynamicPartitionNumberLoader extends 
DynamicPartitionLoader {
                 maxPartitionNum);
     }
 
-    protected List<BinaryRow> getMaxPartitions() {
+    @Override
+    public List<BinaryRow> getMaxPartitions() {
         List<BinaryRow> newPartitions =
                 table.newReadBuilder().newScan().listPartitions().stream()
                         .sorted(comparator.reversed())
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionLoader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionLoader.java
index 701c40b67c..638e5254c7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionLoader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionLoader.java
@@ -23,9 +23,7 @@ import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.types.RowType;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.utils.ParameterUtils;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.RowDataToObjectArrayConverter;
@@ -35,24 +33,22 @@ import javax.annotation.Nullable;
 import java.io.Serializable;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 /** Specify partitions for lookup tables. */
 public abstract class PartitionLoader implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
-    protected static final String MAX_PT = "max_pt()";
-    protected static final String MAX_TWO_PT = "max_two_pt()";
+    private static final String MAX_PT = "max_pt()";
+    private static final String MAX_TWO_PT = "max_two_pt()";
 
-    protected final FileStoreTable table;
+    protected final Table table;
     private final RowDataToObjectArrayConverter partitionConverter;
 
     protected transient List<BinaryRow> partitions;
 
-    protected PartitionLoader(FileStoreTable table) {
+    protected PartitionLoader(Table table) {
         this.table = table;
         this.partitionConverter =
                 new 
RowDataToObjectArrayConverter(table.rowType().project(table.partitionKeys()));
@@ -77,35 +73,16 @@ public abstract class PartitionLoader implements 
Serializable {
     }
 
     public Predicate createSpecificPartFilter() {
-        Predicate partFilter = null;
-        for (BinaryRow partition : partitions) {
-            if (partFilter == null) {
-                partFilter = createSinglePartFilter(partition);
-            } else {
-                partFilter = PredicateBuilder.or(partFilter, 
createSinglePartFilter(partition));
-            }
-        }
-        return partFilter;
-    }
-
-    private Predicate createSinglePartFilter(BinaryRow partition) {
-        RowType rowType = table.rowType();
-        List<String> partitionKeys = table.partitionKeys();
-        Object[] partitionSpec = partitionConverter.convert(partition);
-        Map<String, Object> partitionMap = new HashMap<>(partitionSpec.length);
-        for (int i = 0; i < partitionSpec.length; i++) {
-            partitionMap.put(partitionKeys.get(i), partitionSpec[i]);
-        }
-
         // create partition predicate base on rowType instead of partitionType
-        return PartitionPredicate.createPartitionPredicate(rowType, 
partitionMap);
+        return PartitionPredicate.createPartitionPredicate(
+                table.rowType(), partitionConverter, partitions);
     }
 
     /** @return true if partition changed. */
     public abstract boolean checkRefresh();
 
     @Nullable
-    public static PartitionLoader of(FileStoreTable table) {
+    public static PartitionLoader of(Table table) {
         Options options = Options.fromMap(table.options());
         String scanPartitions = 
options.get(FlinkConnectorOptions.SCAN_PARTITIONS);
         if (scanPartitions == null) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/StaticPartitionLoader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/StaticPartitionLoader.java
index b6387a20a5..bdfecc39f9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/StaticPartitionLoader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/StaticPartitionLoader.java
@@ -20,7 +20,8 @@ package org.apache.paimon.flink.lookup;
 
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
 
@@ -28,13 +29,14 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
+
 /** {@link PartitionLoader} for specified static partitions. */
 public class StaticPartitionLoader extends PartitionLoader {
 
     private final List<Map<String, String>> scanPartitions;
 
-    protected StaticPartitionLoader(
-            FileStoreTable table, List<Map<String, String>> scanPartitions) {
+    protected StaticPartitionLoader(Table table, List<Map<String, String>> 
scanPartitions) {
         super(table);
         this.scanPartitions = scanPartitions;
     }
@@ -42,12 +44,13 @@ public class StaticPartitionLoader extends PartitionLoader {
     @Override
     public void open() {
         partitions = new ArrayList<>();
-        RowType partitionType = table.schema().logicalPartitionType();
+        RowType partitionType = table.rowType().project(table.partitionKeys());
+        String defaultPartitionName = 
Options.fromMap(table.options()).get(PARTITION_DEFAULT_NAME);
         InternalRowSerializer serializer = new 
InternalRowSerializer(partitionType);
         for (Map<String, String> spec : scanPartitions) {
             GenericRow row =
                     InternalRowPartitionComputer.convertSpecToInternalRow(
-                            spec, partitionType, 
table.coreOptions().partitionDefaultName());
+                            spec, partitionType, defaultPartitionName);
             partitions.add(serializer.toBinaryRow(row).copy());
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
index d0e79e2a05..66cb49798a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
@@ -19,9 +19,13 @@
 package org.apache.paimon.flink.source;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.LogicalTypeConversion;
 import org.apache.paimon.flink.PredicateConverter;
+import org.apache.paimon.flink.lookup.DynamicPartitionLoader;
+import org.apache.paimon.flink.lookup.PartitionLoader;
+import org.apache.paimon.flink.lookup.StaticPartitionLoader;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.PartitionPredicate;
@@ -32,7 +36,7 @@ import org.apache.paimon.predicate.PredicateVisitor;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.Split;
-import org.apache.paimon.utils.ParameterUtils;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -52,8 +56,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 
+import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PARTITIONS;
 import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;
-import static 
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
 
 /** A Flink {@link ScanTableSource} for paimon. */
 public abstract class FlinkTableSource
@@ -137,41 +141,46 @@ public abstract class FlinkTableSource
 
     /**
      * This method is only used for normal source (not lookup source). 
Specified partitions in
-     * lookup sources are handled in {@link 
org.apache.paimon.flink.lookup.PartitionLoader}.
+     * lookup sources are handled in {@link 
org.apache.paimon.flink.lookup.PartitionLoader}. But
+     * it's possible that user use max_pt() or max_two_pt() in batch join, so 
use PartitionLoader to
+     * create partition predicate.
      */
     private PartitionPredicate getPartitionPredicateWithOptions() {
-        if (options.contains(FlinkConnectorOptions.SCAN_PARTITIONS)) {
-            try {
-                Predicate predicate =
-                        PartitionPredicate.createPartitionPredicate(
-                                ParameterUtils.getPartitions(
-                                        
options.get(FlinkConnectorOptions.SCAN_PARTITIONS)
-                                                .split(";")),
-                                table.rowType(),
-                                
options.get(CoreOptions.PARTITION_DEFAULT_NAME));
-                // Partition filter will be used to filter Manifest stats, the 
stats schema is
-                // partition type. See SnapshotReaderImpl#withFilter
-                Predicate transformed =
-                        transformFieldMapping(
-                                        predicate,
-                                        
PredicateBuilder.fieldIdxToPartitionIdx(
-                                                table.rowType(), 
table.partitionKeys()))
-                                .orElseThrow(
-                                        () ->
-                                                new RuntimeException(
-                                                        "Failed to transform 
the partition predicate "
-                                                                + predicate));
-                return PartitionPredicate.fromPredicate(
-                        table.rowType().project(table.partitionKeys()), 
transformed);
-            } catch (IllegalArgumentException e) {
-                // In older versions of Flink, however, lookup sources will 
first be treated as
-                // normal sources. So this method will also be visited by 
lookup tables, whose
-                // option value might be max_pt() or max_two_pt(). In this 
case we ignore the
-                // filters.
+        try {
+            PartitionLoader partitionLoader = PartitionLoader.of(table);
+            if (partitionLoader == null) {
                 return null;
             }
 
-        } else {
+            partitionLoader.open();
+            List<BinaryRow> partitions;
+            if (partitionLoader instanceof StaticPartitionLoader) {
+                partitions = partitionLoader.partitions();
+            } else if (partitionLoader instanceof DynamicPartitionLoader) {
+                partitions = ((DynamicPartitionLoader) 
partitionLoader).getMaxPartitions();
+            } else {
+                throw new RuntimeException(
+                        "Failed to handle scan.partitions = " + 
options.get(SCAN_PARTITIONS));
+            }
+            if (partitions.isEmpty()) {
+                return null;
+            }
+
+            // Partition filter will be used to filter Manifest stats, the 
stats schema is
+            // partition type. See SnapshotReaderImpl#withFilter
+            org.apache.paimon.types.RowType partitionType =
+                    table.rowType().project(table.partitionKeys());
+            Predicate predicate =
+                    PartitionPredicate.createPartitionPredicate(
+                            partitionType,
+                            new RowDataToObjectArrayConverter(partitionType),
+                            partitions);
+            return PartitionPredicate.fromPredicate(partitionType, predicate);
+        } catch (IllegalArgumentException e) {
+            // In older versions of Flink, however, lookup sources will first 
be treated as normal
+            // sources. So this method will also be visited by lookup tables, 
and the options might
+            // cause IllegalArgumentException. In this case we ignore the 
filters.
+            LOG.info("Failed to get filter with table options {} ", 
table.options(), e);
             return null;
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index af0d7d508d..1a10eca201 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -928,6 +928,52 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
         assertThat(sql(query)).containsExactly(Row.of(1, 11), Row.of(1, 12), 
Row.of(2, 22));
     }
 
+    @Test
+    public void testScanWithSpecifiedPartitionsWithMaxPt() {
+        sql("CREATE TABLE P (id INT, v INT, pt STRING) PARTITIONED BY (pt)");
+        sql("CREATE TABLE Q (id INT, `proctime` AS PROCTIME())");
+        sql(
+                "INSERT INTO P VALUES (1, 10, 'a'), (2, 20, 'a'), (1, 11, 
'b'), (3, 31, 'b'), (1, 12, 'c'), (2, 22, 'c'), (3, 32, 'c')");
+        sql("INSERT INTO Q VALUES (1), (2), (3)");
+        String query =
+                ThreadLocalRandom.current().nextBoolean()
+                        ? "SELECT Q.id, P.v FROM Q INNER JOIN P /*+ 
OPTIONS('scan.partitions' = 'max_pt()') */ FOR SYSTEM_TIME AS OF Q.proctime ON 
Q.id = P.id"
+                        : "SELECT Q.id, P.v FROM Q INNER JOIN P /*+ 
OPTIONS('scan.partitions' = 'max_pt()') */ ON Q.id = P.id";
+        assertThat(sql(query)).containsExactly(Row.of(1, 12), Row.of(2, 22), 
Row.of(3, 32));
+    }
+
+    @Test
+    public void testScanWithSpecifiedPartitionsWithMaxTwoPt() {
+        sql("CREATE TABLE P (id INT, v INT, pt STRING) PARTITIONED BY (pt)");
+        sql("CREATE TABLE Q (id INT, `proctime` AS PROCTIME())");
+        sql(
+                "INSERT INTO P VALUES (1, 10, 'a'), (2, 20, 'a'), (1, 11, 
'b'), (3, 31, 'b'), (1, 12, 'c'), (2, 22, 'c'), (3, 32, 'c')");
+        sql("INSERT INTO Q VALUES (1), (2), (3)");
+        String query =
+                ThreadLocalRandom.current().nextBoolean()
+                        ? "SELECT Q.id, P.v FROM Q INNER JOIN P /*+ 
OPTIONS('scan.partitions' = 'max_two_pt()') */ FOR SYSTEM_TIME AS OF Q.proctime 
ON Q.id = P.id"
+                        : "SELECT Q.id, P.v FROM Q INNER JOIN P /*+ 
OPTIONS('scan.partitions' = 'max_two_pt()') */ ON Q.id = P.id";
+        assertThat(sql(query))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11), Row.of(1, 12), Row.of(2, 22), Row.of(3, 
31), Row.of(3, 32));
+    }
+
+    @Test
+    public void testScanWithSpecifiedPartitionsWithLevelMaxPt() throws 
Exception {
+        sql(
+                "CREATE TABLE P (id INT, v INT, pt1 STRING, pt2 STRING, pt3 
STRING) PARTITIONED BY (pt1, pt2, pt3)");
+        sql("CREATE TABLE Q (id INT, `proctime` AS PROCTIME())");
+        sql(
+                "INSERT INTO P VALUES (1, 10, 'a', '2025-10-01', '1'), (2, 20, 
'a', '2025-10-01', '2'), (3, 30, 'a', '2025-10-02', '1'), (4, 40, 'a', 
'2025-10-02', '2'), "
+                        + "(1, 11, 'b', '2025-10-01', '1'), (2, 21, 'b', 
'2025-10-01', '2'), (3, 31, 'b', '2025-10-02', '1'), (4, 41, 'b', '2025-10-02', 
'2')");
+        sql("INSERT INTO Q VALUES (1), (2), (3), (4)");
+        String query =
+                ThreadLocalRandom.current().nextBoolean()
+                        ? "SELECT Q.id, P.v FROM Q INNER JOIN P /*+ 
OPTIONS('scan.partitions' = 'pt1=max_pt(),pt2=max_pt()') */ FOR SYSTEM_TIME AS 
OF Q.proctime ON Q.id = P.id"
+                        : "SELECT Q.id, P.v FROM Q INNER JOIN P /*+ 
OPTIONS('scan.partitions' = 'pt1=max_pt(),pt2=max_pt()') */ ON Q.id = P.id";
+        assertThat(sql(query)).containsExactly(Row.of(3, 31), Row.of(4, 41));
+    }
+
     @Test
     public void testEmptyTableIncrementalBetweenTimestamp() {
         assertThat(sql("SELECT * FROM T /*+ 
OPTIONS('incremental-between-timestamp'='0,1') */"))

Reply via email to