This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e7aa863794 [flink] support specify max_pt for partition level (#5831)
e7aa863794 is described below

commit e7aa8637949523b80eb96d0ea419c288b1a5bace
Author: LsomeYeah <[email protected]>
AuthorDate: Fri Jul 4 18:40:49 2025 +0800

    [flink] support specify max_pt for partition level (#5831)
---
 docs/content/flink/sql-lookup.md                   |   6 +
 .../flink/lookup/DynamicPartitionLevelLoader.java  | 150 +++++++++++++
 .../flink/lookup/DynamicPartitionLoader.java       |  60 ++---
 .../flink/lookup/DynamicPartitionNumberLoader.java |  62 ++++++
 .../flink/lookup/FileStoreLookupFunction.java      |   2 +-
 .../paimon/flink/lookup/PartitionLoader.java       |  18 +-
 .../org/apache/paimon/flink/LookupJoinITCase.java  |  85 ++++++++
 .../lookup/DynamicPartitionLevelLoaderTest.java    | 241 +++++++++++++++++++++
 8 files changed, 581 insertions(+), 43 deletions(-)

diff --git a/docs/content/flink/sql-lookup.md b/docs/content/flink/sql-lookup.md
index 537af3cd12..2e2e0e30ed 100644
--- a/docs/content/flink/sql-lookup.md
+++ b/docs/content/flink/sql-lookup.md
@@ -173,6 +173,12 @@ The option `scan.partitions` can also specify fixed 
partitions in the form of `k
 Multiple partitions should be separated by semicolon (`;`).
 When specifying fixed partitions, this option can also be used in batch joins.
 
+The option `scan.partitions` can also specify max_pt() for parent partition in 
the form of `key1=max_pt(),key2=max_pt()`.
+All subpartitions for the latest parent partition will be loaded. For example, 
if partition keys is `'year', 'day', 'hh'`, 
+you can specify `year=max_pt()`, it will find the latest partition for `year` 
and load all its subpartitions for lookup.
+Only supports partitions to be specified hierarchically. For example, if 
setting `year=max_pt(),hh=max_pt()`, `hh=max_pt()`
+makes no sense.
+
 ## Query Service
 
 You can run a Flink Streaming Job to start query service for the table. When 
QueryService exists, Flink Lookup Join
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoader.java
new file mode 100644
index 0000000000..e4ffe2a5e8
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoader.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Dynamic partition loader which can specify the partition level to load for 
lookup. */
+public class DynamicPartitionLevelLoader extends DynamicPartitionLoader {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DynamicPartitionLevelLoader.class);
+
+    private static final long serialVersionUID = 1L;
+
+    private final int maxPartitionLoadLevel;
+    private final List<InternalRow.FieldGetter> fieldGetters;
+
+    private final String defaultPartitionName;
+
+    DynamicPartitionLevelLoader(
+            FileStoreTable table,
+            Duration refreshInterval,
+            Map<String, String> partitionLoadConfig) {
+        super(table, refreshInterval);
+        maxPartitionLoadLevel =
+                getMaxPartitionLoadLevel(partitionLoadConfig, 
table.partitionKeys());
+        fieldGetters = createPartitionFieldGetters();
+        defaultPartitionName = table.coreOptions().partitionDefaultName();
+
+        LOG.info(
+                "Init 
DynamicPartitionLevelLoader(table={}),maxPartitionLoadLevel is {}",
+                table.name(),
+                maxPartitionLoadLevel);
+    }
+
+    @Override
+    protected List<BinaryRow> getMaxPartitions() {
+        List<BinaryRow> newPartitions =
+                table.newReadBuilder().newScan().listPartitions().stream()
+                        .sorted(comparator.reversed())
+                        .collect(Collectors.toList());
+
+        if (maxPartitionLoadLevel == table.partitionKeys().size() - 1) {
+            // if maxPartitionLoadLevel is the max partition level, we only 
need to load the max
+            // partition
+            if (newPartitions.size() <= 1) {
+                return newPartitions;
+            } else {
+                return newPartitions.subList(0, 1);
+            }
+        }
+
+        newPartitions = extractMaxPartitionsForFixedLevel(newPartitions, 
maxPartitionLoadLevel);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                    
"DynamicPartitionLevelLoader(currentPartitionLoadLevel={},table={}) finds new 
partitions: {}.",
+                    maxPartitionLoadLevel,
+                    table.name(),
+                    partitionsToString(newPartitions));
+        }
+
+        return newPartitions;
+    }
+
+    private int getMaxPartitionLoadLevel(Map<String, String> toLoad, 
List<String> fields) {
+        Preconditions.checkArgument(toLoad.size() <= fields.size());
+        int maxLoadLevel = fields.size() - 1;
+        for (int i = 0; i < fields.size(); i++) {
+            if (!toLoad.containsKey(fields.get(i))) {
+                maxLoadLevel = i - 1;
+                break;
+            }
+        }
+        Preconditions.checkArgument(
+                maxLoadLevel >= 0, "the top level partition must set load 
config.");
+        for (int i = maxLoadLevel + 1; i < fields.size(); i++) {
+            Preconditions.checkArgument(
+                    !toLoad.containsKey(fields.get(i)),
+                    "Max load level is %s, "
+                            + "but partition field %s with a higher level %s 
sets MAX_PT.",
+                    maxLoadLevel,
+                    fields.get(i),
+                    i);
+        }
+        return maxLoadLevel;
+    }
+
+    private List<InternalRow.FieldGetter> createPartitionFieldGetters() {
+        List<InternalRow.FieldGetter> fieldGetters = new ArrayList<>();
+
+        RowType partitionType = table.rowType().project(table.partitionKeys());
+
+        for (int i = 0; i < maxPartitionLoadLevel + 1; i++) {
+            
fieldGetters.add(InternalRow.createFieldGetter(partitionType.getTypeAt(i), i));
+        }
+        return fieldGetters;
+    }
+
+    private List<BinaryRow> extractMaxPartitionsForFixedLevel(
+            List<BinaryRow> partitions, int level) {
+        int currentDistinct = 0;
+        Object[] lastFields = new Object[level + 1];
+        for (int i = 0; i < partitions.size(); i++) {
+            BinaryRow partition = partitions.get(i);
+            Object[] newFields = new Object[level + 1];
+            for (int j = 0; j <= level; j++) {
+                newFields[j] = fieldGetters.get(j).getFieldOrNull(partition);
+                if (newFields[j] == null) {
+                    newFields[j] = defaultPartitionName;
+                }
+            }
+            if (!Arrays.equals(newFields, lastFields)) {
+                lastFields = newFields;
+                if (++currentDistinct > 1) {
+                    return partitions.subList(0, i);
+                }
+            }
+        }
+        return partitions;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
index 1683e8707a..201997da55 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
@@ -35,22 +35,20 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 /** Dynamic partition for lookup. */
-public class DynamicPartitionLoader extends PartitionLoader {
+public abstract class DynamicPartitionLoader extends PartitionLoader {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DynamicPartitionLoader.class);
 
-    private static final long serialVersionUID = 2L;
+    private static final long serialVersionUID = 3L;
 
-    private final Duration refreshInterval;
-    private final int maxPartitionNum;
+    protected final Duration refreshInterval;
 
-    private transient Comparator<InternalRow> comparator;
-    private transient LocalDateTime lastRefresh;
+    protected transient Comparator<InternalRow> comparator;
+    protected transient LocalDateTime lastRefresh;
 
-    DynamicPartitionLoader(FileStoreTable table, Duration refreshInterval, int 
maxPartitionNum) {
+    DynamicPartitionLoader(FileStoreTable table, Duration refreshInterval) {
         super(table);
         this.refreshInterval = refreshInterval;
-        this.maxPartitionNum = maxPartitionNum;
     }
 
     @Override
@@ -70,8 +68,7 @@ public class DynamicPartitionLoader extends PartitionLoader {
         }
 
         LOG.info(
-                "DynamicPartitionLoader(maxPartitionNum={},table={}) refreshed 
after {} second(s), refreshing",
-                maxPartitionNum,
+                "DynamicPartitionLoader(table={}) refreshed after {} 
second(s), refreshing",
                 table.name(),
                 refreshInterval.toMillis() / 1000);
 
@@ -90,42 +87,31 @@ public class DynamicPartitionLoader extends PartitionLoader 
{
                     return true;
                 }
             }
-            LOG.info(
-                    "DynamicPartitionLoader(maxPartitionNum={},table={}) 
didn't find new partitions.",
-                    maxPartitionNum,
-                    table.name());
+            LOG.info("DynamicPartitionLoader(table={}) didn't find new 
partitions.", table.name());
             return false;
         }
     }
 
+    protected abstract List<BinaryRow> getMaxPartitions();
+
     private void logNewPartitions() {
-        String partitionsStr =
-                partitions.stream()
-                        .map(
-                                partition ->
-                                        
InternalRowPartitionComputer.partToSimpleString(
-                                                
table.rowType().project(table.partitionKeys()),
-                                                partition,
-                                                "-",
-                                                200))
-                        .collect(Collectors.joining(","));
+        String partitionsStr = partitionsToString(partitions);
+
         LOG.info(
-                "DynamicPartitionLoader(maxPartitionNum={},table={}) finds new 
partitions: {}.",
-                maxPartitionNum,
+                "DynamicPartitionLoader(table={}) finds new partitions: {}.",
                 table.name(),
                 partitionsStr);
     }
 
-    private List<BinaryRow> getMaxPartitions() {
-        List<BinaryRow> newPartitions =
-                table.newReadBuilder().newScan().listPartitions().stream()
-                        .sorted(comparator.reversed())
-                        .collect(Collectors.toList());
-
-        if (newPartitions.size() <= maxPartitionNum) {
-            return newPartitions;
-        } else {
-            return newPartitions.subList(0, maxPartitionNum);
-        }
+    protected String partitionsToString(List<BinaryRow> partitions) {
+        return partitions.stream()
+                .map(
+                        partition ->
+                                
InternalRowPartitionComputer.partToSimpleString(
+                                        
table.rowType().project(table.partitionKeys()),
+                                        partition,
+                                        "-",
+                                        200))
+                .collect(Collectors.joining(","));
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionNumberLoader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionNumberLoader.java
new file mode 100644
index 0000000000..52c1d4c76c
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionNumberLoader.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Dynamic partition loader which can specify the max partition number to 
load for lookup. */
+public class DynamicPartitionNumberLoader extends DynamicPartitionLoader {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DynamicPartitionNumberLoader.class);
+
+    private static final long serialVersionUID = 1L;
+
+    private final int maxPartitionNum;
+
+    DynamicPartitionNumberLoader(
+            FileStoreTable table, Duration refreshInterval, int 
maxPartitionNum) {
+        super(table, refreshInterval);
+        this.maxPartitionNum = maxPartitionNum;
+        LOG.info(
+                "Init DynamicPartitionNumberLoader(table={}),maxPartitionNum 
is {}",
+                table.name(),
+                maxPartitionNum);
+    }
+
+    protected List<BinaryRow> getMaxPartitions() {
+        List<BinaryRow> newPartitions =
+                table.newReadBuilder().newScan().listPartitions().stream()
+                        .sorted(comparator.reversed())
+                        .collect(Collectors.toList());
+
+        if (newPartitions.size() <= maxPartitionNum) {
+            return newPartitions;
+        } else {
+            return newPartitions.subList(0, maxPartitionNum);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 8faafa3571..829209dbb6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -114,7 +114,7 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
         }
 
         this.table = table;
-        this.partitionLoader = DynamicPartitionLoader.of(table);
+        this.partitionLoader = PartitionLoader.of(table);
 
         // join keys are based on projection fields
         RowType rowType = table.rowType();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionLoader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionLoader.java
index 5caa396a1b..701c40b67c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionLoader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionLoader.java
@@ -44,8 +44,8 @@ public abstract class PartitionLoader implements Serializable 
{
 
     private static final long serialVersionUID = 1L;
 
-    private static final String MAX_PT = "max_pt()";
-    private static final String MAX_TWO_PT = "max_two_pt()";
+    protected static final String MAX_PT = "max_pt()";
+    protected static final String MAX_TWO_PT = "max_two_pt()";
 
     protected final FileStoreTable table;
     private final RowDataToObjectArrayConverter partitionConverter;
@@ -127,13 +127,21 @@ public abstract class PartitionLoader implements 
Serializable {
                 break;
         }
 
+        Duration refresh =
+                
options.get(FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION_REFRESH_INTERVAL);
+
         if (maxPartitionNum == -1) {
+            if (scanPartitions.contains(MAX_PT)) {
+                return new DynamicPartitionLevelLoader(
+                        table,
+                        refresh,
+                        
ParameterUtils.parseCommaSeparatedKeyValues(scanPartitions));
+            }
+
             return new StaticPartitionLoader(
                     table, 
ParameterUtils.getPartitions(scanPartitions.split(";")));
         } else {
-            Duration refresh =
-                    
options.get(FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION_REFRESH_INTERVAL);
-            return new DynamicPartitionLoader(table, refresh, maxPartitionNum);
+            return new DynamicPartitionNumberLoader(table, refresh, 
maxPartitionNum);
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index 2744650a8e..dfa6907c91 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -1013,6 +1013,91 @@ public class LookupJoinITCase extends CatalogITCaseBase {
         iterator.close();
     }
 
+    @ParameterizedTest
+    @EnumSource(LookupCacheMode.class)
+    public void testLookupPartitionLevelMaxPt(LookupCacheMode mode) throws 
Exception {
+        sql(
+                "CREATE TABLE PARTITIONED_DIM (pt1 STRING, pt2 INT, i INT, v 
INT)"
+                        + "PARTITIONED BY (`pt1`, `pt2`) WITH ("
+                        + "'scan.partitions' = 'pt1=max_pt()', "
+                        + "'lookup.dynamic-partition.refresh-interval' = '1 
ms', "
+                        + "'lookup.cache' = '%s', "
+                        + "'continuous.discovery-interval'='1 ms')",
+                mode);
+
+        String query =
+                "SELECT D.pt1, D.pt2, T.i, D.v FROM T LEFT JOIN 
PARTITIONED_DIM for SYSTEM_TIME AS OF T.proctime AS D ON T.i = D.i";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+        sql(
+                "INSERT INTO PARTITIONED_DIM VALUES ('202415', 14, 1, 1), 
('202415', 15, 1, 1), ('202414', 15, 1, 1)");
+        Thread.sleep(500); // wait refresh
+        sql("INSERT INTO T VALUES (1)");
+        List<Row> result = iterator.collect(2);
+        assertThat(result)
+                .containsExactlyInAnyOrder(Row.of("202415", 14, 1, 1), 
Row.of("202415", 15, 1, 1));
+
+        sql("INSERT INTO PARTITIONED_DIM VALUES ('202416', 14, 2, 2), 
('202416', 15, 2, 2)");
+        Thread.sleep(500); // wait refresh
+        sql("INSERT INTO T VALUES (2)");
+        result = iterator.collect(2);
+        assertThat(result)
+                .containsExactlyInAnyOrder(Row.of("202416", 14, 2, 2), 
Row.of("202416", 15, 2, 2));
+
+        sql("ALTER TABLE PARTITIONED_DIM DROP PARTITION (pt1 = '202416',pt2 = 
'15')");
+        Thread.sleep(500); // wait refresh
+        sql("INSERT INTO T VALUES (1), (2)");
+        result = iterator.collect(2);
+        assertThat(result)
+                .containsExactlyInAnyOrder(Row.of(null, null, 1, null), 
Row.of("202416", 14, 2, 2));
+
+        iterator.close();
+    }
+
+    @ParameterizedTest
+    @EnumSource(LookupCacheMode.class)
+    public void testLookupMultiPartitionLevelMaxPt(LookupCacheMode mode) 
throws Exception {
+        sql(
+                "CREATE TABLE PARTITIONED_DIM (pt1 STRING, pt2 INT, pt3 INT, i 
INT, v INT)"
+                        + "PARTITIONED BY (`pt1`, `pt2`, `pt3`) WITH ("
+                        + "'scan.partitions' = 'pt1=max_pt(),pt2=max_pt()', "
+                        + "'lookup.dynamic-partition.refresh-interval' = '1 
ms', "
+                        + "'lookup.cache' = '%s', "
+                        + "'continuous.discovery-interval'='1 ms')",
+                mode);
+
+        String query =
+                "SELECT D.pt1, D.pt2, D.pt3, T.i, D.v FROM T LEFT JOIN 
PARTITIONED_DIM for SYSTEM_TIME AS OF T.proctime AS D ON T.i = D.i";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+        sql(
+                "INSERT INTO PARTITIONED_DIM VALUES ('202415', 15, 1, 1, 1), 
('202415', 15, 2, 1, 1), ('202414', 15, 1, 1, 1)");
+        Thread.sleep(500); // wait refresh
+        sql("INSERT INTO T VALUES (1)");
+        List<Row> result = iterator.collect(2);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of("202415", 15, 1, 1, 1), Row.of("202415", 15, 2, 
1, 1));
+
+        sql("INSERT INTO PARTITIONED_DIM VALUES ('202416', 15, 1, 2, 2), 
('202416', 15, 2, 2, 2)");
+        Thread.sleep(500); // wait refresh
+        sql("INSERT INTO T VALUES (2)");
+        result = iterator.collect(2);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of("202416", 15, 1, 2, 2), Row.of("202416", 15, 2, 
2, 2));
+
+        sql("ALTER TABLE PARTITIONED_DIM DROP PARTITION (pt1 = '202416',pt2 = 
'15',pt3 = '1')");
+        Thread.sleep(500); // wait refresh
+        sql("INSERT INTO T VALUES (1), (2)");
+        result = iterator.collect(2);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(null, null, null, 1, null), Row.of("202416", 
15, 2, 2, 2));
+
+        iterator.close();
+    }
+
     @ParameterizedTest
     @EnumSource(LookupCacheMode.class)
     public void testLookupMaxTwoPt0(LookupCacheMode mode) throws Exception {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoaderTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoaderTest.java
new file mode 100644
index 0000000000..916ed69060
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoaderTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.lookup.rocksdb.RocksDBOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/** Test for {@link DynamicPartitionLevelLoader}. */
+public class DynamicPartitionLevelLoaderTest {
+
+    @TempDir private Path tempDir;
+
+    private final String commitUser = UUID.randomUUID().toString();
+    private final TraceableFileIO fileIO = new TraceableFileIO();
+
+    private org.apache.paimon.fs.Path tablePath;
+    private FileStoreTable table;
+
+    @BeforeEach
+    public void before() throws Exception {
+        tablePath = new org.apache.paimon.fs.Path(tempDir.toString());
+    }
+
+    @Test
+    public void testGetMaxPartitions() throws Exception {
+        List<String> partitionKeys = Arrays.asList("pt1", "pt2", "pt3");
+        List<String> primaryKeys = Arrays.asList("pt1", "pt2", "pt3", "k");
+        table = createFileStoreTable(partitionKeys, primaryKeys, 
Collections.emptyMap());
+
+        TableWriteImpl<?> write = table.newWrite(commitUser);
+        TableCommitImpl commit = table.newCommit(commitUser);
+        write.write(GenericRow.of(BinaryString.fromString("2025"), 16, 2, 1, 
1L));
+        write.write(GenericRow.of(BinaryString.fromString("2025"), 15, 1, 1, 
1L));
+        write.write(GenericRow.of(BinaryString.fromString("2024"), 15, 1, 1, 
1L));
+        write.write(GenericRow.of(BinaryString.fromString("2025"), 15, 2, 1, 
1L));
+        write.write(GenericRow.of(BinaryString.fromString("2025"), 16, 1, 1, 
1L));
+        write.write(GenericRow.of(BinaryString.fromString("2024"), 16, 1, 1, 
1L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        // test specify first-level partition
+        Map<String, String> customOptions = new HashMap<>();
+        customOptions.put(FlinkConnectorOptions.SCAN_PARTITIONS.key(), 
"pt1=max_pt()");
+        table = table.copy(customOptions);
+
+        DynamicPartitionLevelLoader partitionLoader =
+                (DynamicPartitionLevelLoader) PartitionLoader.of(table);
+        partitionLoader.open();
+        List<BinaryRow> partitions = partitionLoader.getMaxPartitions();
+        assertThat(partitions.size()).isEqualTo(4);
+        assertThat(partitionsToString(partitions))
+                .hasSameElementsAs(
+                        Arrays.asList("2025/16/2", "2025/16/1", "2025/15/2", 
"2025/15/1"));
+
+        // test specify first-level and second-level partition
+        customOptions.put(FlinkConnectorOptions.SCAN_PARTITIONS.key(), 
"pt1=max_pt(),pt2=max_pt()");
+        table = table.copy(customOptions);
+        partitionLoader = (DynamicPartitionLevelLoader) 
PartitionLoader.of(table);
+        partitionLoader.open();
+        partitions = partitionLoader.getMaxPartitions();
+        assertThat(partitions.size()).isEqualTo(2);
+        assertThat(partitionsToString(partitions))
+                .hasSameElementsAs(Arrays.asList("2025/16/2", "2025/16/1"));
+
+        // test specify all level partition
+        customOptions.put(
+                FlinkConnectorOptions.SCAN_PARTITIONS.key(),
+                "pt1=max_pt(),pt2=max_pt(),pt3=max_pt()");
+        table = table.copy(customOptions);
+
+        partitionLoader = (DynamicPartitionLevelLoader) 
PartitionLoader.of(table);
+        partitionLoader.open();
+        partitions = partitionLoader.getMaxPartitions();
+        assertThat(partitions.size()).isEqualTo(1);
+        
assertThat(partitionsToString(partitions)).hasSameElementsAs(Arrays.asList("2025/16/2"));
+
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void testGetMaxPartitionsWhenNullPartition() throws Exception {
+        List<String> partitionKeys = Arrays.asList("pt1", "pt2", "pt3");
+        table =
+                createFileStoreTable(
+                        partitionKeys, Collections.emptyList(), 
Collections.emptyMap());
+
+        TableWriteImpl<?> write = table.newWrite(commitUser);
+        TableCommitImpl commit = table.newCommit(commitUser);
+        write.write(GenericRow.of(BinaryString.fromString("2025"), 15, 1, 1, 
1L));
+        write.write(GenericRow.of(BinaryString.fromString("2025"), 15, 2, 1, 
1L));
+        write.write(GenericRow.of(BinaryString.fromString("2025"), 15, null, 
1, 1L));
+        write.write(GenericRow.of(BinaryString.fromString("2025"), null, 1, 1, 
1L));
+        write.write(GenericRow.of(BinaryString.fromString("2024"), 15, 1, 1, 
1L));
+        write.write(GenericRow.of(null, 16, 1, 1, 1L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        Map<String, String> customOptions = new HashMap<>();
+        customOptions.put(FlinkConnectorOptions.SCAN_PARTITIONS.key(), 
"pt1=max_pt(),pt2=max_pt()");
+        table = table.copy(customOptions);
+
+        DynamicPartitionLevelLoader partitionLoader =
+                (DynamicPartitionLevelLoader) PartitionLoader.of(table);
+        partitionLoader.open();
+        List<BinaryRow> partitions = partitionLoader.getMaxPartitions();
+        assertThat(partitions.size()).isEqualTo(3);
+        assertThat(partitionsToString(partitions))
+                .hasSameElementsAs(Arrays.asList("2025/15/2", "2025/15/1", 
"2025/15/null"));
+
+        write.write(GenericRow.of(BinaryString.fromString("2026"), null, null, 
1, 1L));
+        write.write(GenericRow.of(BinaryString.fromString("2026"), null, 1, 1, 
1L));
+        commit.commit(2, write.prepareCommit(true, 2));
+        partitionLoader = (DynamicPartitionLevelLoader) 
PartitionLoader.of(table);
+        partitionLoader.open();
+        partitions = partitionLoader.getMaxPartitions();
+        assertThat(partitions.size()).isEqualTo(2);
+        assertThat(partitionsToString(partitions))
+                .hasSameElementsAs(Arrays.asList("2026/null/1", 
"2026/null/null"));
+
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void testWrongConfig() throws Exception {
+        List<String> partitionKeys = Arrays.asList("pt1", "pt2", "pt3");
+        table =
+                createFileStoreTable(
+                        partitionKeys, Collections.emptyList(), 
Collections.emptyMap());
+
+        TableWriteImpl<?> write = table.newWrite(commitUser);
+        TableCommitImpl commit = table.newCommit(commitUser);
+        write.write(GenericRow.of(BinaryString.fromString("2025"), 15, 1, 1, 
1L));
+        write.write(GenericRow.of(BinaryString.fromString("2025"), 15, 2, 1, 
1L));
+        write.write(GenericRow.of(BinaryString.fromString("2025"), 15, null, 
1, 1L));
+        write.write(GenericRow.of(BinaryString.fromString("2025"), null, 1, 1, 
1L));
+        write.write(GenericRow.of(BinaryString.fromString("2024"), 15, 1, 1, 
1L));
+        write.write(GenericRow.of(null, 16, 1, 1, 1L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        Map<String, String> customOptions = new HashMap<>();
+        customOptions.put(FlinkConnectorOptions.SCAN_PARTITIONS.key(), 
"pt1=max_pt(),pt3=max_pt()");
+        table = table.copy(customOptions);
+
+        assertThatCode(() -> PartitionLoader.of(table))
+                .hasMessage(
+                        "Max load level is 0, but partition field pt3 with a 
higher level 2 sets MAX_PT.");
+
+        write.close();
+        commit.close();
+    }
+
+    private FileStoreTable createFileStoreTable(
+            List<String> partitionKeys, List<String> primaryKeys, Map<String, 
String> customOptions)
+            throws Exception {
+        SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
+        Options conf = new Options(customOptions);
+        conf.set(CoreOptions.BUCKET, 2);
+        conf.set(RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL, 
Duration.ofSeconds(1));
+        if (primaryKeys.isEmpty()) {
+            conf.set(CoreOptions.BUCKET_KEY.key(), "k");
+        }
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING(),
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.BIGINT()
+                        },
+                        new String[] {"pt1", "pt2", "pt3", "k", "v"});
+        Schema schema =
+                new Schema(rowType.getFields(), partitionKeys, primaryKeys, 
conf.toMap(), "");
+        TableSchema tableSchema = schemaManager.createTable(schema);
+        return FileStoreTableFactory.create(
+                fileIO, new org.apache.paimon.fs.Path(tempDir.toString()), 
tableSchema);
+    }
+
+    private List<String> partitionsToString(List<BinaryRow> partitions) {
+        return partitions.stream()
+                .map(
+                        partition ->
+                                
InternalRowPartitionComputer.partToSimpleString(
+                                        
table.rowType().project(table.partitionKeys()),
+                                        partition,
+                                        "/",
+                                        200))
+                .collect(Collectors.toList());
+    }
+}


Reply via email to