This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e7aa863794 [flink] support specify max_pt for partition level (#5831)
e7aa863794 is described below
commit e7aa8637949523b80eb96d0ea419c288b1a5bace
Author: LsomeYeah <[email protected]>
AuthorDate: Fri Jul 4 18:40:49 2025 +0800
[flink] support specify max_pt for partition level (#5831)
---
docs/content/flink/sql-lookup.md | 6 +
.../flink/lookup/DynamicPartitionLevelLoader.java | 150 +++++++++++++
.../flink/lookup/DynamicPartitionLoader.java | 60 ++---
.../flink/lookup/DynamicPartitionNumberLoader.java | 62 ++++++
.../flink/lookup/FileStoreLookupFunction.java | 2 +-
.../paimon/flink/lookup/PartitionLoader.java | 18 +-
.../org/apache/paimon/flink/LookupJoinITCase.java | 85 ++++++++
.../lookup/DynamicPartitionLevelLoaderTest.java | 241 +++++++++++++++++++++
8 files changed, 581 insertions(+), 43 deletions(-)
diff --git a/docs/content/flink/sql-lookup.md b/docs/content/flink/sql-lookup.md
index 537af3cd12..2e2e0e30ed 100644
--- a/docs/content/flink/sql-lookup.md
+++ b/docs/content/flink/sql-lookup.md
@@ -173,6 +173,12 @@ The option `scan.partitions` can also specify fixed
partitions in the form of `k
Multiple partitions should be separated by semicolon (`;`).
When specifying fixed partitions, this option can also be used in batch joins.
+The option `scan.partitions` can also specify max_pt() for parent partition in
the form of `key1=max_pt(),key2=max_pt()`.
+All subpartitions for the latest parent partition will be loaded. For example,
if partition keys is `'year', 'day', 'hh'`,
+you can specify `year=max_pt()`, it will find the latest partition for `year`
and load all its subpartitions for lookup.
+Only supports partitions to be specified hierarchically. For example, if
setting `year=max_pt(),hh=max_pt()`, `hh=max_pt()`
+makes no sense.
+
## Query Service
You can run a Flink Streaming Job to start query service for the table. When
QueryService exists, Flink Lookup Join
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoader.java
new file mode 100644
index 0000000000..e4ffe2a5e8
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoader.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Dynamic partition loader which can specify the partition level to load for
lookup. */
+public class DynamicPartitionLevelLoader extends DynamicPartitionLoader {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DynamicPartitionLevelLoader.class);
+
+ private static final long serialVersionUID = 1L;
+
+ private final int maxPartitionLoadLevel;
+ private final List<InternalRow.FieldGetter> fieldGetters;
+
+ private final String defaultPartitionName;
+
+ DynamicPartitionLevelLoader(
+ FileStoreTable table,
+ Duration refreshInterval,
+ Map<String, String> partitionLoadConfig) {
+ super(table, refreshInterval);
+ maxPartitionLoadLevel =
+ getMaxPartitionLoadLevel(partitionLoadConfig,
table.partitionKeys());
+ fieldGetters = createPartitionFieldGetters();
+ defaultPartitionName = table.coreOptions().partitionDefaultName();
+
+ LOG.info(
+ "Init
DynamicPartitionLevelLoader(table={}),maxPartitionLoadLevel is {}",
+ table.name(),
+ maxPartitionLoadLevel);
+ }
+
+ @Override
+ protected List<BinaryRow> getMaxPartitions() {
+ List<BinaryRow> newPartitions =
+ table.newReadBuilder().newScan().listPartitions().stream()
+ .sorted(comparator.reversed())
+ .collect(Collectors.toList());
+
+ if (maxPartitionLoadLevel == table.partitionKeys().size() - 1) {
+ // if maxPartitionLoadLevel is the max partition level, we only
need to load the max
+ // partition
+ if (newPartitions.size() <= 1) {
+ return newPartitions;
+ } else {
+ return newPartitions.subList(0, 1);
+ }
+ }
+
+ newPartitions = extractMaxPartitionsForFixedLevel(newPartitions,
maxPartitionLoadLevel);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+
"DynamicPartitionLevelLoader(currentPartitionLoadLevel={},table={}) finds new
partitions: {}.",
+ maxPartitionLoadLevel,
+ table.name(),
+ partitionsToString(newPartitions));
+ }
+
+ return newPartitions;
+ }
+
+ private int getMaxPartitionLoadLevel(Map<String, String> toLoad,
List<String> fields) {
+ Preconditions.checkArgument(toLoad.size() <= fields.size());
+ int maxLoadLevel = fields.size() - 1;
+ for (int i = 0; i < fields.size(); i++) {
+ if (!toLoad.containsKey(fields.get(i))) {
+ maxLoadLevel = i - 1;
+ break;
+ }
+ }
+ Preconditions.checkArgument(
+ maxLoadLevel >= 0, "the top level partition must set load
config.");
+ for (int i = maxLoadLevel + 1; i < fields.size(); i++) {
+ Preconditions.checkArgument(
+ !toLoad.containsKey(fields.get(i)),
+ "Max load level is %s, "
+ + "but partition field %s with a higher level %s
sets MAX_PT.",
+ maxLoadLevel,
+ fields.get(i),
+ i);
+ }
+ return maxLoadLevel;
+ }
+
+ private List<InternalRow.FieldGetter> createPartitionFieldGetters() {
+ List<InternalRow.FieldGetter> fieldGetters = new ArrayList<>();
+
+ RowType partitionType = table.rowType().project(table.partitionKeys());
+
+ for (int i = 0; i < maxPartitionLoadLevel + 1; i++) {
+
fieldGetters.add(InternalRow.createFieldGetter(partitionType.getTypeAt(i), i));
+ }
+ return fieldGetters;
+ }
+
+ private List<BinaryRow> extractMaxPartitionsForFixedLevel(
+ List<BinaryRow> partitions, int level) {
+ int currentDistinct = 0;
+ Object[] lastFields = new Object[level + 1];
+ for (int i = 0; i < partitions.size(); i++) {
+ BinaryRow partition = partitions.get(i);
+ Object[] newFields = new Object[level + 1];
+ for (int j = 0; j <= level; j++) {
+ newFields[j] = fieldGetters.get(j).getFieldOrNull(partition);
+ if (newFields[j] == null) {
+ newFields[j] = defaultPartitionName;
+ }
+ }
+ if (!Arrays.equals(newFields, lastFields)) {
+ lastFields = newFields;
+ if (++currentDistinct > 1) {
+ return partitions.subList(0, i);
+ }
+ }
+ }
+ return partitions;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
index 1683e8707a..201997da55 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
@@ -35,22 +35,20 @@ import java.util.List;
import java.util.stream.Collectors;
/** Dynamic partition for lookup. */
-public class DynamicPartitionLoader extends PartitionLoader {
+public abstract class DynamicPartitionLoader extends PartitionLoader {
private static final Logger LOG =
LoggerFactory.getLogger(DynamicPartitionLoader.class);
- private static final long serialVersionUID = 2L;
+ private static final long serialVersionUID = 3L;
- private final Duration refreshInterval;
- private final int maxPartitionNum;
+ protected final Duration refreshInterval;
- private transient Comparator<InternalRow> comparator;
- private transient LocalDateTime lastRefresh;
+ protected transient Comparator<InternalRow> comparator;
+ protected transient LocalDateTime lastRefresh;
- DynamicPartitionLoader(FileStoreTable table, Duration refreshInterval, int
maxPartitionNum) {
+ DynamicPartitionLoader(FileStoreTable table, Duration refreshInterval) {
super(table);
this.refreshInterval = refreshInterval;
- this.maxPartitionNum = maxPartitionNum;
}
@Override
@@ -70,8 +68,7 @@ public class DynamicPartitionLoader extends PartitionLoader {
}
LOG.info(
- "DynamicPartitionLoader(maxPartitionNum={},table={}) refreshed
after {} second(s), refreshing",
- maxPartitionNum,
+ "DynamicPartitionLoader(table={}) refreshed after {}
second(s), refreshing",
table.name(),
refreshInterval.toMillis() / 1000);
@@ -90,42 +87,31 @@ public class DynamicPartitionLoader extends PartitionLoader
{
return true;
}
}
- LOG.info(
- "DynamicPartitionLoader(maxPartitionNum={},table={})
didn't find new partitions.",
- maxPartitionNum,
- table.name());
+ LOG.info("DynamicPartitionLoader(table={}) didn't find new
partitions.", table.name());
return false;
}
}
+ protected abstract List<BinaryRow> getMaxPartitions();
+
private void logNewPartitions() {
- String partitionsStr =
- partitions.stream()
- .map(
- partition ->
-
InternalRowPartitionComputer.partToSimpleString(
-
table.rowType().project(table.partitionKeys()),
- partition,
- "-",
- 200))
- .collect(Collectors.joining(","));
+ String partitionsStr = partitionsToString(partitions);
+
LOG.info(
- "DynamicPartitionLoader(maxPartitionNum={},table={}) finds new
partitions: {}.",
- maxPartitionNum,
+ "DynamicPartitionLoader(table={}) finds new partitions: {}.",
table.name(),
partitionsStr);
}
- private List<BinaryRow> getMaxPartitions() {
- List<BinaryRow> newPartitions =
- table.newReadBuilder().newScan().listPartitions().stream()
- .sorted(comparator.reversed())
- .collect(Collectors.toList());
-
- if (newPartitions.size() <= maxPartitionNum) {
- return newPartitions;
- } else {
- return newPartitions.subList(0, maxPartitionNum);
- }
+ protected String partitionsToString(List<BinaryRow> partitions) {
+ return partitions.stream()
+ .map(
+ partition ->
+
InternalRowPartitionComputer.partToSimpleString(
+
table.rowType().project(table.partitionKeys()),
+ partition,
+ "-",
+ 200))
+ .collect(Collectors.joining(","));
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionNumberLoader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionNumberLoader.java
new file mode 100644
index 0000000000..52c1d4c76c
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionNumberLoader.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Dynamic partition loader which can specify the max partition number to
load for lookup. */
+public class DynamicPartitionNumberLoader extends DynamicPartitionLoader {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DynamicPartitionNumberLoader.class);
+
+ private static final long serialVersionUID = 1L;
+
+ private final int maxPartitionNum;
+
+ DynamicPartitionNumberLoader(
+ FileStoreTable table, Duration refreshInterval, int
maxPartitionNum) {
+ super(table, refreshInterval);
+ this.maxPartitionNum = maxPartitionNum;
+ LOG.info(
+ "Init DynamicPartitionNumberLoader(table={}),maxPartitionNum
is {}",
+ table.name(),
+ maxPartitionNum);
+ }
+
+ protected List<BinaryRow> getMaxPartitions() {
+ List<BinaryRow> newPartitions =
+ table.newReadBuilder().newScan().listPartitions().stream()
+ .sorted(comparator.reversed())
+ .collect(Collectors.toList());
+
+ if (newPartitions.size() <= maxPartitionNum) {
+ return newPartitions;
+ } else {
+ return newPartitions.subList(0, maxPartitionNum);
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 8faafa3571..829209dbb6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -114,7 +114,7 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
}
this.table = table;
- this.partitionLoader = DynamicPartitionLoader.of(table);
+ this.partitionLoader = PartitionLoader.of(table);
// join keys are based on projection fields
RowType rowType = table.rowType();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionLoader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionLoader.java
index 5caa396a1b..701c40b67c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionLoader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionLoader.java
@@ -44,8 +44,8 @@ public abstract class PartitionLoader implements Serializable
{
private static final long serialVersionUID = 1L;
- private static final String MAX_PT = "max_pt()";
- private static final String MAX_TWO_PT = "max_two_pt()";
+ protected static final String MAX_PT = "max_pt()";
+ protected static final String MAX_TWO_PT = "max_two_pt()";
protected final FileStoreTable table;
private final RowDataToObjectArrayConverter partitionConverter;
@@ -127,13 +127,21 @@ public abstract class PartitionLoader implements
Serializable {
break;
}
+ Duration refresh =
+
options.get(FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION_REFRESH_INTERVAL);
+
if (maxPartitionNum == -1) {
+ if (scanPartitions.contains(MAX_PT)) {
+ return new DynamicPartitionLevelLoader(
+ table,
+ refresh,
+
ParameterUtils.parseCommaSeparatedKeyValues(scanPartitions));
+ }
+
return new StaticPartitionLoader(
table,
ParameterUtils.getPartitions(scanPartitions.split(";")));
} else {
- Duration refresh =
-
options.get(FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION_REFRESH_INTERVAL);
- return new DynamicPartitionLoader(table, refresh, maxPartitionNum);
+ return new DynamicPartitionNumberLoader(table, refresh,
maxPartitionNum);
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index 2744650a8e..dfa6907c91 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -1013,6 +1013,91 @@ public class LookupJoinITCase extends CatalogITCaseBase {
iterator.close();
}
+ @ParameterizedTest
+ @EnumSource(LookupCacheMode.class)
+ public void testLookupPartitionLevelMaxPt(LookupCacheMode mode) throws
Exception {
+ sql(
+ "CREATE TABLE PARTITIONED_DIM (pt1 STRING, pt2 INT, i INT, v
INT)"
+ + "PARTITIONED BY (`pt1`, `pt2`) WITH ("
+ + "'scan.partitions' = 'pt1=max_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1
ms', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ mode);
+
+ String query =
+ "SELECT D.pt1, D.pt2, T.i, D.v FROM T LEFT JOIN
PARTITIONED_DIM for SYSTEM_TIME AS OF T.proctime AS D ON T.i = D.i";
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql(
+ "INSERT INTO PARTITIONED_DIM VALUES ('202415', 14, 1, 1),
('202415', 15, 1, 1), ('202414', 15, 1, 1)");
+ Thread.sleep(500); // wait refresh
+ sql("INSERT INTO T VALUES (1)");
+ List<Row> result = iterator.collect(2);
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of("202415", 14, 1, 1),
Row.of("202415", 15, 1, 1));
+
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('202416', 14, 2, 2),
('202416', 15, 2, 2)");
+ Thread.sleep(500); // wait refresh
+ sql("INSERT INTO T VALUES (2)");
+ result = iterator.collect(2);
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of("202416", 14, 2, 2),
Row.of("202416", 15, 2, 2));
+
+ sql("ALTER TABLE PARTITIONED_DIM DROP PARTITION (pt1 = '202416',pt2 =
'15')");
+ Thread.sleep(500); // wait refresh
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(2);
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of(null, null, 1, null),
Row.of("202416", 14, 2, 2));
+
+ iterator.close();
+ }
+
+ @ParameterizedTest
+ @EnumSource(LookupCacheMode.class)
+ public void testLookupMultiPartitionLevelMaxPt(LookupCacheMode mode)
throws Exception {
+ sql(
+ "CREATE TABLE PARTITIONED_DIM (pt1 STRING, pt2 INT, pt3 INT, i
INT, v INT)"
+ + "PARTITIONED BY (`pt1`, `pt2`, `pt3`) WITH ("
+ + "'scan.partitions' = 'pt1=max_pt(),pt2=max_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1
ms', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ mode);
+
+ String query =
+ "SELECT D.pt1, D.pt2, D.pt3, T.i, D.v FROM T LEFT JOIN
PARTITIONED_DIM for SYSTEM_TIME AS OF T.proctime AS D ON T.i = D.i";
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql(
+ "INSERT INTO PARTITIONED_DIM VALUES ('202415', 15, 1, 1, 1),
('202415', 15, 2, 1, 1), ('202414', 15, 1, 1, 1)");
+ Thread.sleep(500); // wait refresh
+ sql("INSERT INTO T VALUES (1)");
+ List<Row> result = iterator.collect(2);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of("202415", 15, 1, 1, 1), Row.of("202415", 15, 2,
1, 1));
+
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('202416', 15, 1, 2, 2),
('202416', 15, 2, 2, 2)");
+ Thread.sleep(500); // wait refresh
+ sql("INSERT INTO T VALUES (2)");
+ result = iterator.collect(2);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of("202416", 15, 1, 2, 2), Row.of("202416", 15, 2,
2, 2));
+
+ sql("ALTER TABLE PARTITIONED_DIM DROP PARTITION (pt1 = '202416',pt2 =
'15',pt3 = '1')");
+ Thread.sleep(500); // wait refresh
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(2);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of(null, null, null, 1, null), Row.of("202416",
15, 2, 2, 2));
+
+ iterator.close();
+ }
+
@ParameterizedTest
@EnumSource(LookupCacheMode.class)
public void testLookupMaxTwoPt0(LookupCacheMode mode) throws Exception {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoaderTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoaderTest.java
new file mode 100644
index 0000000000..916ed69060
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoaderTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.lookup.rocksdb.RocksDBOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/** Test for {@link DynamicPartitionLevelLoader}. */
+public class DynamicPartitionLevelLoaderTest {
+
+ @TempDir private Path tempDir;
+
+ private final String commitUser = UUID.randomUUID().toString();
+ private final TraceableFileIO fileIO = new TraceableFileIO();
+
+ private org.apache.paimon.fs.Path tablePath;
+ private FileStoreTable table;
+
+ @BeforeEach
+ public void before() throws Exception {
+ tablePath = new org.apache.paimon.fs.Path(tempDir.toString());
+ }
+
+ @Test
+ public void testGetMaxPartitions() throws Exception {
+ List<String> partitionKeys = Arrays.asList("pt1", "pt2", "pt3");
+ List<String> primaryKeys = Arrays.asList("pt1", "pt2", "pt3", "k");
+ table = createFileStoreTable(partitionKeys, primaryKeys,
Collections.emptyMap());
+
+ TableWriteImpl<?> write = table.newWrite(commitUser);
+ TableCommitImpl commit = table.newCommit(commitUser);
+ write.write(GenericRow.of(BinaryString.fromString("2025"), 16, 2, 1,
1L));
+ write.write(GenericRow.of(BinaryString.fromString("2025"), 15, 1, 1,
1L));
+ write.write(GenericRow.of(BinaryString.fromString("2024"), 15, 1, 1,
1L));
+ write.write(GenericRow.of(BinaryString.fromString("2025"), 15, 2, 1,
1L));
+ write.write(GenericRow.of(BinaryString.fromString("2025"), 16, 1, 1,
1L));
+ write.write(GenericRow.of(BinaryString.fromString("2024"), 16, 1, 1,
1L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ // test specify first-level partition
+ Map<String, String> customOptions = new HashMap<>();
+ customOptions.put(FlinkConnectorOptions.SCAN_PARTITIONS.key(),
"pt1=max_pt()");
+ table = table.copy(customOptions);
+
+ DynamicPartitionLevelLoader partitionLoader =
+ (DynamicPartitionLevelLoader) PartitionLoader.of(table);
+ partitionLoader.open();
+ List<BinaryRow> partitions = partitionLoader.getMaxPartitions();
+ assertThat(partitions.size()).isEqualTo(4);
+ assertThat(partitionsToString(partitions))
+ .hasSameElementsAs(
+ Arrays.asList("2025/16/2", "2025/16/1", "2025/15/2",
"2025/15/1"));
+
+ // test specify first-level and second-level partition
+ customOptions.put(FlinkConnectorOptions.SCAN_PARTITIONS.key(),
"pt1=max_pt(),pt2=max_pt()");
+ table = table.copy(customOptions);
+ partitionLoader = (DynamicPartitionLevelLoader)
PartitionLoader.of(table);
+ partitionLoader.open();
+ partitions = partitionLoader.getMaxPartitions();
+ assertThat(partitions.size()).isEqualTo(2);
+ assertThat(partitionsToString(partitions))
+ .hasSameElementsAs(Arrays.asList("2025/16/2", "2025/16/1"));
+
+ // test specify all level partition
+ customOptions.put(
+ FlinkConnectorOptions.SCAN_PARTITIONS.key(),
+ "pt1=max_pt(),pt2=max_pt(),pt3=max_pt()");
+ table = table.copy(customOptions);
+
+ partitionLoader = (DynamicPartitionLevelLoader)
PartitionLoader.of(table);
+ partitionLoader.open();
+ partitions = partitionLoader.getMaxPartitions();
+ assertThat(partitions.size()).isEqualTo(1);
+
assertThat(partitionsToString(partitions)).hasSameElementsAs(Arrays.asList("2025/16/2"));
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testGetMaxPartitionsWhenNullPartition() throws Exception {
+ List<String> partitionKeys = Arrays.asList("pt1", "pt2", "pt3");
+ table =
+ createFileStoreTable(
+ partitionKeys, Collections.emptyList(),
Collections.emptyMap());
+
+ TableWriteImpl<?> write = table.newWrite(commitUser);
+ TableCommitImpl commit = table.newCommit(commitUser);
+ write.write(GenericRow.of(BinaryString.fromString("2025"), 15, 1, 1,
1L));
+ write.write(GenericRow.of(BinaryString.fromString("2025"), 15, 2, 1,
1L));
+ write.write(GenericRow.of(BinaryString.fromString("2025"), 15, null,
1, 1L));
+ write.write(GenericRow.of(BinaryString.fromString("2025"), null, 1, 1,
1L));
+ write.write(GenericRow.of(BinaryString.fromString("2024"), 15, 1, 1,
1L));
+ write.write(GenericRow.of(null, 16, 1, 1, 1L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ Map<String, String> customOptions = new HashMap<>();
+ customOptions.put(FlinkConnectorOptions.SCAN_PARTITIONS.key(),
"pt1=max_pt(),pt2=max_pt()");
+ table = table.copy(customOptions);
+
+ DynamicPartitionLevelLoader partitionLoader =
+ (DynamicPartitionLevelLoader) PartitionLoader.of(table);
+ partitionLoader.open();
+ List<BinaryRow> partitions = partitionLoader.getMaxPartitions();
+ assertThat(partitions.size()).isEqualTo(3);
+ assertThat(partitionsToString(partitions))
+ .hasSameElementsAs(Arrays.asList("2025/15/2", "2025/15/1",
"2025/15/null"));
+
+ write.write(GenericRow.of(BinaryString.fromString("2026"), null, null,
1, 1L));
+ write.write(GenericRow.of(BinaryString.fromString("2026"), null, 1, 1,
1L));
+ commit.commit(2, write.prepareCommit(true, 2));
+ partitionLoader = (DynamicPartitionLevelLoader)
PartitionLoader.of(table);
+ partitionLoader.open();
+ partitions = partitionLoader.getMaxPartitions();
+ assertThat(partitions.size()).isEqualTo(2);
+ assertThat(partitionsToString(partitions))
+ .hasSameElementsAs(Arrays.asList("2026/null/1",
"2026/null/null"));
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testWrongConfig() throws Exception {
+ List<String> partitionKeys = Arrays.asList("pt1", "pt2", "pt3");
+ table =
+ createFileStoreTable(
+ partitionKeys, Collections.emptyList(),
Collections.emptyMap());
+
+ TableWriteImpl<?> write = table.newWrite(commitUser);
+ TableCommitImpl commit = table.newCommit(commitUser);
+ write.write(GenericRow.of(BinaryString.fromString("2025"), 15, 1, 1,
1L));
+ write.write(GenericRow.of(BinaryString.fromString("2025"), 15, 2, 1,
1L));
+ write.write(GenericRow.of(BinaryString.fromString("2025"), 15, null,
1, 1L));
+ write.write(GenericRow.of(BinaryString.fromString("2025"), null, 1, 1,
1L));
+ write.write(GenericRow.of(BinaryString.fromString("2024"), 15, 1, 1,
1L));
+ write.write(GenericRow.of(null, 16, 1, 1, 1L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ Map<String, String> customOptions = new HashMap<>();
+ customOptions.put(FlinkConnectorOptions.SCAN_PARTITIONS.key(),
"pt1=max_pt(),pt3=max_pt()");
+ table = table.copy(customOptions);
+
+ assertThatCode(() -> PartitionLoader.of(table))
+ .hasMessage(
+ "Max load level is 0, but partition field pt3 with a
higher level 2 sets MAX_PT.");
+
+ write.close();
+ commit.close();
+ }
+
+ private FileStoreTable createFileStoreTable(
+ List<String> partitionKeys, List<String> primaryKeys, Map<String,
String> customOptions)
+ throws Exception {
+ SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
+ Options conf = new Options(customOptions);
+ conf.set(CoreOptions.BUCKET, 2);
+ conf.set(RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL,
Duration.ofSeconds(1));
+ if (primaryKeys.isEmpty()) {
+ conf.set(CoreOptions.BUCKET_KEY.key(), "k");
+ }
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.BIGINT()
+ },
+ new String[] {"pt1", "pt2", "pt3", "k", "v"});
+ Schema schema =
+ new Schema(rowType.getFields(), partitionKeys, primaryKeys,
conf.toMap(), "");
+ TableSchema tableSchema = schemaManager.createTable(schema);
+ return FileStoreTableFactory.create(
+ fileIO, new org.apache.paimon.fs.Path(tempDir.toString()),
tableSchema);
+ }
+
+ private List<String> partitionsToString(List<BinaryRow> partitions) {
+ return partitions.stream()
+ .map(
+ partition ->
+
InternalRowPartitionComputer.partToSimpleString(
+
table.rowType().project(table.partitionKeys()),
+ partition,
+ "/",
+ 200))
+ .collect(Collectors.toList());
+ }
+}