This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 30d8c1cf5d [flink] Flink batch job support specify partition with
max_pt() and max_two_pt() (#6728)
30d8c1cf5d is described below
commit 30d8c1cf5d50e9d44431ca4eb4b352821097d0b5
Author: yuzelin <[email protected]>
AuthorDate: Mon Dec 15 13:46:32 2025 +0800
[flink] Flink batch job support specify partition with max_pt() and
max_two_pt() (#6728)
---
.../generated/flink_connector_configuration.html | 2 +-
.../paimon/partition/PartitionPredicate.java | 35 +++++++----
.../apache/paimon/flink/FlinkConnectorOptions.java | 7 ++-
.../flink/lookup/DynamicPartitionLevelLoader.java | 13 ++--
.../flink/lookup/DynamicPartitionLoader.java | 6 +-
.../flink/lookup/DynamicPartitionNumberLoader.java | 8 +--
.../paimon/flink/lookup/PartitionLoader.java | 39 +++---------
.../paimon/flink/lookup/StaticPartitionLoader.java | 13 ++--
.../paimon/flink/source/FlinkTableSource.java | 73 ++++++++++++----------
.../apache/paimon/flink/BatchFileStoreITCase.java | 46 ++++++++++++++
10 files changed, 145 insertions(+), 97 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 30aa9dea29..0549fde466 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -180,7 +180,7 @@ under the License.
<td><h5>scan.partitions</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
- <td>Specify the partitions to scan. Partitions should be given in
the form of key1=value1,key2=value2. Partition keys not specified will be
filled with the value of partition.default-name. Multiple partitions should be
separated by semicolon (;). This option can support normal source tables and
lookup join tables. For lookup joins, two special values max_pt() and
max_two_pt() are also supported, specifying the (two) partition(s) with the
largest partition value.</td>
+ <td>Specify the partitions to scan. Partitions should be given in
the form of key1=value1,key2=value2. Partition keys not specified will be
filled with the value of partition.default-name. Multiple partitions should be
separated by semicolon (;). This option can support normal source tables and
lookup join tables. There are two special values max_pt() and max_two_pt() are
also supported to specify the (two) partition(s) with the largest partition
value. For lookup source, the [...]
</tr>
<tr>
<td><h5>scan.remove-normalize</h5></td>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
index 1f041f5153..cb4e952664 100644
---
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
@@ -273,25 +273,34 @@ public interface PartitionPredicate extends Serializable {
return predicate;
}
- static Predicate createPartitionPredicate(RowType partitionType, Object[]
partition) {
- Preconditions.checkArgument(
- partition.length == partitionType.getFieldCount(),
- "Partition's field count should be equal to partitionType's
field count.");
-
- Map<String, Object> partitionMap = new HashMap<>(partition.length);
- for (int i = 0; i < partition.length; i++) {
- partitionMap.put(partitionType.getFields().get(i).name(),
partition[i]);
+ static Predicate createPartitionPredicate(
+ RowType rowType, RowDataToObjectArrayConverter converter,
List<BinaryRow> partitions) {
+ Predicate partFilter = null;
+ for (BinaryRow partition : partitions) {
+ if (partFilter == null) {
+ partFilter = createSinglePartitionPredicate(rowType,
converter, partition);
+ } else {
+ partFilter =
+ PredicateBuilder.or(
+ partFilter,
+ createSinglePartitionPredicate(rowType,
converter, partition));
+ }
}
-
- return createPartitionPredicate(partitionType, partitionMap);
+ return partFilter;
}
- static Predicate createPartitionPredicate(RowType partitionType, BinaryRow
partition) {
+ static Predicate createSinglePartitionPredicate(
+ RowType rowType, RowDataToObjectArrayConverter converter,
BinaryRow partition) {
+ RowType partitionType = converter.rowType();
Preconditions.checkArgument(
partition.getFieldCount() == partitionType.getFieldCount(),
"Partition's field count should be equal to partitionType's
field count.");
- RowDataToObjectArrayConverter converter = new
RowDataToObjectArrayConverter(partitionType);
- return createPartitionPredicate(partitionType,
converter.convert(partition));
+ Object[] partitionSpec = converter.convert(partition);
+ Map<String, Object> partitionMap = new HashMap<>(partitionSpec.length);
+ for (int i = 0; i < partitionSpec.length; i++) {
+ partitionMap.put(partitionType.getFields().get(i).name(),
partitionSpec[i]);
+ }
+ return createPartitionPredicate(rowType, partitionMap);
}
@Nullable
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index eaf959b477..a1089d991a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -302,8 +302,11 @@ public class FlinkConnectorOptions {
+ CoreOptions.PARTITION_DEFAULT_NAME.key()
+ ". Multiple partitions should be
separated by semicolon (;). "
+ "This option can support normal source
tables and lookup join tables. "
- + "For lookup joins, two special values
max_pt() and max_two_pt() are also supported, "
- + "specifying the (two) partition(s) with
the largest partition value.");
+ + "There are two special values max_pt()
and max_two_pt() are also supported "
+ + "to specify the (two) partition(s) with
the largest partition value. For "
+ + "lookup source, the max partition(s)
will be periodically refreshed; for "
+ + "normal source, the max partition(s)
will be determined before job running "
+ + "without refreshing even for streaming
jobs.");
public static final ConfigOption<Duration>
LOOKUP_DYNAMIC_PARTITION_REFRESH_INTERVAL =
ConfigOptions.key("lookup.dynamic-partition.refresh-interval")
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoader.java
index e4ffe2a5e8..88920c05fc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoader.java
@@ -20,7 +20,8 @@ package org.apache.paimon.flink.lookup;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
@@ -34,6 +35,8 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
+
/** Dynamic partition loader which can specify the partition level to load for
lookup. */
public class DynamicPartitionLevelLoader extends DynamicPartitionLoader {
@@ -47,14 +50,12 @@ public class DynamicPartitionLevelLoader extends
DynamicPartitionLoader {
private final String defaultPartitionName;
DynamicPartitionLevelLoader(
- FileStoreTable table,
- Duration refreshInterval,
- Map<String, String> partitionLoadConfig) {
+ Table table, Duration refreshInterval, Map<String, String>
partitionLoadConfig) {
super(table, refreshInterval);
maxPartitionLoadLevel =
getMaxPartitionLoadLevel(partitionLoadConfig,
table.partitionKeys());
fieldGetters = createPartitionFieldGetters();
- defaultPartitionName = table.coreOptions().partitionDefaultName();
+ defaultPartitionName =
Options.fromMap(table.options()).get(PARTITION_DEFAULT_NAME);
LOG.info(
"Init
DynamicPartitionLevelLoader(table={}),maxPartitionLoadLevel is {}",
@@ -63,7 +64,7 @@ public class DynamicPartitionLevelLoader extends
DynamicPartitionLoader {
}
@Override
- protected List<BinaryRow> getMaxPartitions() {
+ public List<BinaryRow> getMaxPartitions() {
List<BinaryRow> newPartitions =
table.newReadBuilder().newScan().listPartitions().stream()
.sorted(comparator.reversed())
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
index 201997da55..37671dee0f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
@@ -21,7 +21,7 @@ package org.apache.paimon.flink.lookup;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowPartitionComputer;
@@ -46,7 +46,7 @@ public abstract class DynamicPartitionLoader extends
PartitionLoader {
protected transient Comparator<InternalRow> comparator;
protected transient LocalDateTime lastRefresh;
- DynamicPartitionLoader(FileStoreTable table, Duration refreshInterval) {
+ DynamicPartitionLoader(Table table, Duration refreshInterval) {
super(table);
this.refreshInterval = refreshInterval;
}
@@ -92,7 +92,7 @@ public abstract class DynamicPartitionLoader extends
PartitionLoader {
}
}
- protected abstract List<BinaryRow> getMaxPartitions();
+ public abstract List<BinaryRow> getMaxPartitions();
private void logNewPartitions() {
String partitionsStr = partitionsToString(partitions);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionNumberLoader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionNumberLoader.java
index 52c1d4c76c..7e210725de 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionNumberLoader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionNumberLoader.java
@@ -19,7 +19,7 @@
package org.apache.paimon.flink.lookup;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,8 +37,7 @@ public class DynamicPartitionNumberLoader extends
DynamicPartitionLoader {
private final int maxPartitionNum;
- DynamicPartitionNumberLoader(
- FileStoreTable table, Duration refreshInterval, int
maxPartitionNum) {
+ DynamicPartitionNumberLoader(Table table, Duration refreshInterval, int
maxPartitionNum) {
super(table, refreshInterval);
this.maxPartitionNum = maxPartitionNum;
LOG.info(
@@ -47,7 +46,8 @@ public class DynamicPartitionNumberLoader extends
DynamicPartitionLoader {
maxPartitionNum);
}
- protected List<BinaryRow> getMaxPartitions() {
+ @Override
+ public List<BinaryRow> getMaxPartitions() {
List<BinaryRow> newPartitions =
table.newReadBuilder().newScan().listPartitions().stream()
.sorted(comparator.reversed())
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionLoader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionLoader.java
index 701c40b67c..638e5254c7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionLoader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionLoader.java
@@ -23,9 +23,7 @@ import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.types.RowType;
+import org.apache.paimon.table.Table;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
@@ -35,24 +33,22 @@ import javax.annotation.Nullable;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
/** Specify partitions for lookup tables. */
public abstract class PartitionLoader implements Serializable {
private static final long serialVersionUID = 1L;
- protected static final String MAX_PT = "max_pt()";
- protected static final String MAX_TWO_PT = "max_two_pt()";
+ private static final String MAX_PT = "max_pt()";
+ private static final String MAX_TWO_PT = "max_two_pt()";
- protected final FileStoreTable table;
+ protected final Table table;
private final RowDataToObjectArrayConverter partitionConverter;
protected transient List<BinaryRow> partitions;
- protected PartitionLoader(FileStoreTable table) {
+ protected PartitionLoader(Table table) {
this.table = table;
this.partitionConverter =
new
RowDataToObjectArrayConverter(table.rowType().project(table.partitionKeys()));
@@ -77,35 +73,16 @@ public abstract class PartitionLoader implements
Serializable {
}
public Predicate createSpecificPartFilter() {
- Predicate partFilter = null;
- for (BinaryRow partition : partitions) {
- if (partFilter == null) {
- partFilter = createSinglePartFilter(partition);
- } else {
- partFilter = PredicateBuilder.or(partFilter,
createSinglePartFilter(partition));
- }
- }
- return partFilter;
- }
-
- private Predicate createSinglePartFilter(BinaryRow partition) {
- RowType rowType = table.rowType();
- List<String> partitionKeys = table.partitionKeys();
- Object[] partitionSpec = partitionConverter.convert(partition);
- Map<String, Object> partitionMap = new HashMap<>(partitionSpec.length);
- for (int i = 0; i < partitionSpec.length; i++) {
- partitionMap.put(partitionKeys.get(i), partitionSpec[i]);
- }
-
// create partition predicate base on rowType instead of partitionType
- return PartitionPredicate.createPartitionPredicate(rowType,
partitionMap);
+ return PartitionPredicate.createPartitionPredicate(
+ table.rowType(), partitionConverter, partitions);
}
/** @return true if partition changed. */
public abstract boolean checkRefresh();
@Nullable
- public static PartitionLoader of(FileStoreTable table) {
+ public static PartitionLoader of(Table table) {
Options options = Options.fromMap(table.options());
String scanPartitions =
options.get(FlinkConnectorOptions.SCAN_PARTITIONS);
if (scanPartitions == null) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/StaticPartitionLoader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/StaticPartitionLoader.java
index b6387a20a5..bdfecc39f9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/StaticPartitionLoader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/StaticPartitionLoader.java
@@ -20,7 +20,8 @@ package org.apache.paimon.flink.lookup;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowPartitionComputer;
@@ -28,13 +29,14 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
+
/** {@link PartitionLoader} for specified static partitions. */
public class StaticPartitionLoader extends PartitionLoader {
private final List<Map<String, String>> scanPartitions;
- protected StaticPartitionLoader(
- FileStoreTable table, List<Map<String, String>> scanPartitions) {
+ protected StaticPartitionLoader(Table table, List<Map<String, String>>
scanPartitions) {
super(table);
this.scanPartitions = scanPartitions;
}
@@ -42,12 +44,13 @@ public class StaticPartitionLoader extends PartitionLoader {
@Override
public void open() {
partitions = new ArrayList<>();
- RowType partitionType = table.schema().logicalPartitionType();
+ RowType partitionType = table.rowType().project(table.partitionKeys());
+ String defaultPartitionName =
Options.fromMap(table.options()).get(PARTITION_DEFAULT_NAME);
InternalRowSerializer serializer = new
InternalRowSerializer(partitionType);
for (Map<String, String> spec : scanPartitions) {
GenericRow row =
InternalRowPartitionComputer.convertSpecToInternalRow(
- spec, partitionType,
table.coreOptions().partitionDefaultName());
+ spec, partitionType, defaultPartitionName);
partitions.add(serializer.toBinaryRow(row).copy());
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
index d0e79e2a05..66cb49798a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
@@ -19,9 +19,13 @@
package org.apache.paimon.flink.source;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.PredicateConverter;
+import org.apache.paimon.flink.lookup.DynamicPartitionLoader;
+import org.apache.paimon.flink.lookup.PartitionLoader;
+import org.apache.paimon.flink.lookup.StaticPartitionLoader;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
@@ -32,7 +36,7 @@ import org.apache.paimon.predicate.PredicateVisitor;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
-import org.apache.paimon.utils.ParameterUtils;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -52,8 +56,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PARTITIONS;
import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;
-import static
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
/** A Flink {@link ScanTableSource} for paimon. */
public abstract class FlinkTableSource
@@ -137,41 +141,46 @@ public abstract class FlinkTableSource
/**
* This method is only used for normal source (not lookup source).
Specified partitions in
- * lookup sources are handled in {@link
org.apache.paimon.flink.lookup.PartitionLoader}.
+ * lookup sources are handled in {@link
org.apache.paimon.flink.lookup.PartitionLoader}. But
+ * it's possible that user use max_pt() or max_two_pt() in batch join, so
use PartitionLoader to
+ * create partition predicate.
*/
private PartitionPredicate getPartitionPredicateWithOptions() {
- if (options.contains(FlinkConnectorOptions.SCAN_PARTITIONS)) {
- try {
- Predicate predicate =
- PartitionPredicate.createPartitionPredicate(
- ParameterUtils.getPartitions(
-
options.get(FlinkConnectorOptions.SCAN_PARTITIONS)
- .split(";")),
- table.rowType(),
-
options.get(CoreOptions.PARTITION_DEFAULT_NAME));
- // Partition filter will be used to filter Manifest stats, the
stats schema is
- // partition type. See SnapshotReaderImpl#withFilter
- Predicate transformed =
- transformFieldMapping(
- predicate,
-
PredicateBuilder.fieldIdxToPartitionIdx(
- table.rowType(),
table.partitionKeys()))
- .orElseThrow(
- () ->
- new RuntimeException(
- "Failed to transform
the partition predicate "
- + predicate));
- return PartitionPredicate.fromPredicate(
- table.rowType().project(table.partitionKeys()),
transformed);
- } catch (IllegalArgumentException e) {
- // In older versions of Flink, however, lookup sources will
first be treated as
- // normal sources. So this method will also be visited by
lookup tables, whose
- // option value might be max_pt() or max_two_pt(). In this
case we ignore the
- // filters.
+ try {
+ PartitionLoader partitionLoader = PartitionLoader.of(table);
+ if (partitionLoader == null) {
return null;
}
- } else {
+ partitionLoader.open();
+ List<BinaryRow> partitions;
+ if (partitionLoader instanceof StaticPartitionLoader) {
+ partitions = partitionLoader.partitions();
+ } else if (partitionLoader instanceof DynamicPartitionLoader) {
+ partitions = ((DynamicPartitionLoader)
partitionLoader).getMaxPartitions();
+ } else {
+ throw new RuntimeException(
+ "Failed to handle scan.partitions = " +
options.get(SCAN_PARTITIONS));
+ }
+ if (partitions.isEmpty()) {
+ return null;
+ }
+
+ // Partition filter will be used to filter Manifest stats, the
stats schema is
+ // partition type. See SnapshotReaderImpl#withFilter
+ org.apache.paimon.types.RowType partitionType =
+ table.rowType().project(table.partitionKeys());
+ Predicate predicate =
+ PartitionPredicate.createPartitionPredicate(
+ partitionType,
+ new RowDataToObjectArrayConverter(partitionType),
+ partitions);
+ return PartitionPredicate.fromPredicate(partitionType, predicate);
+ } catch (IllegalArgumentException e) {
+ // In older versions of Flink, however, lookup sources will first
be treated as normal
+ // sources. So this method will also be visited by lookup tables,
and the options might
+ // cause IllegalArgumentException. In this case we ignore the
filters.
+ LOG.info("Failed to get filter with table options {} ",
table.options(), e);
return null;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index af0d7d508d..1a10eca201 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -928,6 +928,52 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
assertThat(sql(query)).containsExactly(Row.of(1, 11), Row.of(1, 12),
Row.of(2, 22));
}
+ @Test
+ public void testScanWithSpecifiedPartitionsWithMaxPt() {
+ sql("CREATE TABLE P (id INT, v INT, pt STRING) PARTITIONED BY (pt)");
+ sql("CREATE TABLE Q (id INT, `proctime` AS PROCTIME())");
+ sql(
+ "INSERT INTO P VALUES (1, 10, 'a'), (2, 20, 'a'), (1, 11,
'b'), (3, 31, 'b'), (1, 12, 'c'), (2, 22, 'c'), (3, 32, 'c')");
+ sql("INSERT INTO Q VALUES (1), (2), (3)");
+ String query =
+ ThreadLocalRandom.current().nextBoolean()
+ ? "SELECT Q.id, P.v FROM Q INNER JOIN P /*+
OPTIONS('scan.partitions' = 'max_pt()') */ FOR SYSTEM_TIME AS OF Q.proctime ON
Q.id = P.id"
+ : "SELECT Q.id, P.v FROM Q INNER JOIN P /*+
OPTIONS('scan.partitions' = 'max_pt()') */ ON Q.id = P.id";
+ assertThat(sql(query)).containsExactly(Row.of(1, 12), Row.of(2, 22),
Row.of(3, 32));
+ }
+
+ @Test
+ public void testScanWithSpecifiedPartitionsWithMaxTwoPt() {
+ sql("CREATE TABLE P (id INT, v INT, pt STRING) PARTITIONED BY (pt)");
+ sql("CREATE TABLE Q (id INT, `proctime` AS PROCTIME())");
+ sql(
+ "INSERT INTO P VALUES (1, 10, 'a'), (2, 20, 'a'), (1, 11,
'b'), (3, 31, 'b'), (1, 12, 'c'), (2, 22, 'c'), (3, 32, 'c')");
+ sql("INSERT INTO Q VALUES (1), (2), (3)");
+ String query =
+ ThreadLocalRandom.current().nextBoolean()
+ ? "SELECT Q.id, P.v FROM Q INNER JOIN P /*+
OPTIONS('scan.partitions' = 'max_two_pt()') */ FOR SYSTEM_TIME AS OF Q.proctime
ON Q.id = P.id"
+ : "SELECT Q.id, P.v FROM Q INNER JOIN P /*+
OPTIONS('scan.partitions' = 'max_two_pt()') */ ON Q.id = P.id";
+ assertThat(sql(query))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 11), Row.of(1, 12), Row.of(2, 22), Row.of(3,
31), Row.of(3, 32));
+ }
+
+ @Test
+ public void testScanWithSpecifiedPartitionsWithLevelMaxPt() throws
Exception {
+ sql(
+ "CREATE TABLE P (id INT, v INT, pt1 STRING, pt2 STRING, pt3
STRING) PARTITIONED BY (pt1, pt2, pt3)");
+ sql("CREATE TABLE Q (id INT, `proctime` AS PROCTIME())");
+ sql(
+ "INSERT INTO P VALUES (1, 10, 'a', '2025-10-01', '1'), (2, 20,
'a', '2025-10-01', '2'), (3, 30, 'a', '2025-10-02', '1'), (4, 40, 'a',
'2025-10-02', '2'), "
+ + "(1, 11, 'b', '2025-10-01', '1'), (2, 21, 'b',
'2025-10-01', '2'), (3, 31, 'b', '2025-10-02', '1'), (4, 41, 'b', '2025-10-02',
'2')");
+ sql("INSERT INTO Q VALUES (1), (2), (3), (4)");
+ String query =
+ ThreadLocalRandom.current().nextBoolean()
+ ? "SELECT Q.id, P.v FROM Q INNER JOIN P /*+
OPTIONS('scan.partitions' = 'pt1=max_pt(),pt2=max_pt()') */ FOR SYSTEM_TIME AS
OF Q.proctime ON Q.id = P.id"
+ : "SELECT Q.id, P.v FROM Q INNER JOIN P /*+
OPTIONS('scan.partitions' = 'pt1=max_pt(),pt2=max_pt()') */ ON Q.id = P.id";
+ assertThat(sql(query)).containsExactly(Row.of(3, 31), Row.of(4, 41));
+ }
+
@Test
public void testEmptyTableIncrementalBetweenTimestamp() {
assertThat(sql("SELECT * FROM T /*+
OPTIONS('incremental-between-timestamp'='0,1') */"))