This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d139659c95 Revert "[flink] Flink batch job support specify partition
with max_pt() and max_two_pt()" without review
d139659c95 is described below
commit d139659c956fe821eeefe8b4d816ac80bc361dcd
Author: yuzelin <[email protected]>
AuthorDate: Tue Dec 2 22:19:00 2025 +0800
Revert "[flink] Flink batch job support specify partition with max_pt() and
max_two_pt()" without review
This reverts commit 0d6db479f4879599b2da4fa8cbfad413276720bc.
---
.../flink/lookup/DynamicPartitionLevelLoader.java | 13 ++--
.../flink/lookup/DynamicPartitionLoader.java | 6 +-
.../flink/lookup/DynamicPartitionNumberLoader.java | 8 +--
.../flink/lookup/FileStoreLookupFunction.java | 4 +-
.../paimon/flink/lookup/PartitionLoader.java | 14 ++---
.../paimon/flink/lookup/StaticPartitionLoader.java | 13 ++--
.../paimon/flink/source/FlinkTableSource.java | 70 ++++++++++------------
.../apache/paimon/flink/BatchFileStoreITCase.java | 40 -------------
8 files changed, 60 insertions(+), 108 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoader.java
index 88920c05fc..e4ffe2a5e8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLevelLoader.java
@@ -20,8 +20,7 @@ package org.apache.paimon.flink.lookup;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.table.Table;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
@@ -35,8 +34,6 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
-
/** Dynamic partition loader which can specify the partition level to load for
lookup. */
public class DynamicPartitionLevelLoader extends DynamicPartitionLoader {
@@ -50,12 +47,14 @@ public class DynamicPartitionLevelLoader extends
DynamicPartitionLoader {
private final String defaultPartitionName;
DynamicPartitionLevelLoader(
- Table table, Duration refreshInterval, Map<String, String>
partitionLoadConfig) {
+ FileStoreTable table,
+ Duration refreshInterval,
+ Map<String, String> partitionLoadConfig) {
super(table, refreshInterval);
maxPartitionLoadLevel =
getMaxPartitionLoadLevel(partitionLoadConfig,
table.partitionKeys());
fieldGetters = createPartitionFieldGetters();
- defaultPartitionName =
Options.fromMap(table.options()).get(PARTITION_DEFAULT_NAME);
+ defaultPartitionName = table.coreOptions().partitionDefaultName();
LOG.info(
"Init
DynamicPartitionLevelLoader(table={}),maxPartitionLoadLevel is {}",
@@ -64,7 +63,7 @@ public class DynamicPartitionLevelLoader extends
DynamicPartitionLoader {
}
@Override
- public List<BinaryRow> getMaxPartitions() {
+ protected List<BinaryRow> getMaxPartitions() {
List<BinaryRow> newPartitions =
table.newReadBuilder().newScan().listPartitions().stream()
.sorted(comparator.reversed())
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
index 37671dee0f..201997da55 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
@@ -21,7 +21,7 @@ package org.apache.paimon.flink.lookup;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.table.Table;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowPartitionComputer;
@@ -46,7 +46,7 @@ public abstract class DynamicPartitionLoader extends
PartitionLoader {
protected transient Comparator<InternalRow> comparator;
protected transient LocalDateTime lastRefresh;
- DynamicPartitionLoader(Table table, Duration refreshInterval) {
+ DynamicPartitionLoader(FileStoreTable table, Duration refreshInterval) {
super(table);
this.refreshInterval = refreshInterval;
}
@@ -92,7 +92,7 @@ public abstract class DynamicPartitionLoader extends
PartitionLoader {
}
}
- public abstract List<BinaryRow> getMaxPartitions();
+ protected abstract List<BinaryRow> getMaxPartitions();
private void logNewPartitions() {
String partitionsStr = partitionsToString(partitions);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionNumberLoader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionNumberLoader.java
index 7e210725de..52c1d4c76c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionNumberLoader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionNumberLoader.java
@@ -19,7 +19,7 @@
package org.apache.paimon.flink.lookup;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.table.Table;
+import org.apache.paimon.table.FileStoreTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,7 +37,8 @@ public class DynamicPartitionNumberLoader extends
DynamicPartitionLoader {
private final int maxPartitionNum;
- DynamicPartitionNumberLoader(Table table, Duration refreshInterval, int
maxPartitionNum) {
+ DynamicPartitionNumberLoader(
+ FileStoreTable table, Duration refreshInterval, int
maxPartitionNum) {
super(table, refreshInterval);
this.maxPartitionNum = maxPartitionNum;
LOG.info(
@@ -46,8 +47,7 @@ public class DynamicPartitionNumberLoader extends
DynamicPartitionLoader {
maxPartitionNum);
}
- @Override
- public List<BinaryRow> getMaxPartitions() {
+ protected List<BinaryRow> getMaxPartitions() {
List<BinaryRow> newPartitions =
table.newReadBuilder().newScan().listPartitions().stream()
.sorted(comparator.reversed())
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 121a86d782..7df4742101 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -227,7 +227,7 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
List<BinaryRow> partitions = partitionLoader.partitions();
if (!partitions.isEmpty()) {
lookupTable.specifyPartitions(
- partitions,
partitionLoader.createSpecificPartFilter(partitions));
+ partitions,
partitionLoader.createSpecificPartFilter());
}
}
@@ -329,7 +329,7 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
if (partitionChanged) {
// reopen with latest partition
lookupTable.specifyPartitions(
- partitions,
partitionLoader.createSpecificPartFilter(partitions));
+ partitionLoader.partitions(),
partitionLoader.createSpecificPartFilter());
lookupTable.close();
lookupTable.open();
// no need to refresh the lookup table because it is reopened
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionLoader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionLoader.java
index 72f7777f01..701c40b67c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionLoader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionLoader.java
@@ -24,7 +24,7 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.table.Table;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.paimon.utils.Preconditions;
@@ -44,15 +44,15 @@ public abstract class PartitionLoader implements
Serializable {
private static final long serialVersionUID = 1L;
- private static final String MAX_PT = "max_pt()";
- private static final String MAX_TWO_PT = "max_two_pt()";
+ protected static final String MAX_PT = "max_pt()";
+ protected static final String MAX_TWO_PT = "max_two_pt()";
- protected final Table table;
+ protected final FileStoreTable table;
private final RowDataToObjectArrayConverter partitionConverter;
protected transient List<BinaryRow> partitions;
- protected PartitionLoader(Table table) {
+ protected PartitionLoader(FileStoreTable table) {
this.table = table;
this.partitionConverter =
new
RowDataToObjectArrayConverter(table.rowType().project(table.partitionKeys()));
@@ -76,7 +76,7 @@ public abstract class PartitionLoader implements Serializable
{
partitionKeys.stream().filter(k ->
!projectFields.contains(k)).forEach(projectFields::add);
}
- public Predicate createSpecificPartFilter(List<BinaryRow> partitions) {
+ public Predicate createSpecificPartFilter() {
Predicate partFilter = null;
for (BinaryRow partition : partitions) {
if (partFilter == null) {
@@ -105,7 +105,7 @@ public abstract class PartitionLoader implements
Serializable {
public abstract boolean checkRefresh();
@Nullable
- public static PartitionLoader of(Table table) {
+ public static PartitionLoader of(FileStoreTable table) {
Options options = Options.fromMap(table.options());
String scanPartitions =
options.get(FlinkConnectorOptions.SCAN_PARTITIONS);
if (scanPartitions == null) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/StaticPartitionLoader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/StaticPartitionLoader.java
index bdfecc39f9..b6387a20a5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/StaticPartitionLoader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/StaticPartitionLoader.java
@@ -20,8 +20,7 @@ package org.apache.paimon.flink.lookup;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.table.Table;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowPartitionComputer;
@@ -29,14 +28,13 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
-
/** {@link PartitionLoader} for specified static partitions. */
public class StaticPartitionLoader extends PartitionLoader {
private final List<Map<String, String>> scanPartitions;
- protected StaticPartitionLoader(Table table, List<Map<String, String>>
scanPartitions) {
+ protected StaticPartitionLoader(
+ FileStoreTable table, List<Map<String, String>> scanPartitions) {
super(table);
this.scanPartitions = scanPartitions;
}
@@ -44,13 +42,12 @@ public class StaticPartitionLoader extends PartitionLoader {
@Override
public void open() {
partitions = new ArrayList<>();
- RowType partitionType = table.rowType().project(table.partitionKeys());
- String defaultPartitionName =
Options.fromMap(table.options()).get(PARTITION_DEFAULT_NAME);
+ RowType partitionType = table.schema().logicalPartitionType();
InternalRowSerializer serializer = new
InternalRowSerializer(partitionType);
for (Map<String, String> spec : scanPartitions) {
GenericRow row =
InternalRowPartitionComputer.convertSpecToInternalRow(
- spec, partitionType, defaultPartitionName);
+ spec, partitionType,
table.coreOptions().partitionDefaultName());
partitions.add(serializer.toBinaryRow(row).copy());
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
index 2ee8f235d5..d0e79e2a05 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
@@ -19,13 +19,9 @@
package org.apache.paimon.flink.source;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.PredicateConverter;
-import org.apache.paimon.flink.lookup.DynamicPartitionLoader;
-import org.apache.paimon.flink.lookup.PartitionLoader;
-import org.apache.paimon.flink.lookup.StaticPartitionLoader;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
@@ -34,9 +30,9 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.PredicateVisitor;
import org.apache.paimon.table.DataTable;
-import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
+import org.apache.paimon.utils.ParameterUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -56,7 +52,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
-import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PARTITIONS;
import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;
import static
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
@@ -142,42 +137,43 @@ public abstract class FlinkTableSource
/**
* This method is only used for normal source (not lookup source).
Specified partitions in
- * lookup sources are handled in {@link
org.apache.paimon.flink.lookup.PartitionLoader}. But we
- * use PartitionLoader to create partition predicate. It's possible that
user use max_pt() or
- * max_two_pt() in batch join, so we should also handle it.
+ * lookup sources are handled in {@link
org.apache.paimon.flink.lookup.PartitionLoader}.
*/
private PartitionPredicate getPartitionPredicateWithOptions() {
- PartitionLoader partitionLoader = PartitionLoader.of((FileStoreTable)
table);
- if (partitionLoader == null) {
- return null;
- }
+ if (options.contains(FlinkConnectorOptions.SCAN_PARTITIONS)) {
+ try {
+ Predicate predicate =
+ PartitionPredicate.createPartitionPredicate(
+ ParameterUtils.getPartitions(
+
options.get(FlinkConnectorOptions.SCAN_PARTITIONS)
+ .split(";")),
+ table.rowType(),
+
options.get(CoreOptions.PARTITION_DEFAULT_NAME));
+ // Partition filter will be used to filter Manifest stats, the
stats schema is
+ // partition type. See SnapshotReaderImpl#withFilter
+ Predicate transformed =
+ transformFieldMapping(
+ predicate,
+
PredicateBuilder.fieldIdxToPartitionIdx(
+ table.rowType(),
table.partitionKeys()))
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ "Failed to transform
the partition predicate "
+ + predicate));
+ return PartitionPredicate.fromPredicate(
+ table.rowType().project(table.partitionKeys()),
transformed);
+ } catch (IllegalArgumentException e) {
+ // In older versions of Flink, however, lookup sources will
first be treated as
+ // normal sources. So this method will also be visited by
lookup tables, whose
+ // option value might be max_pt() or max_two_pt(). In this
case we ignore the
+ // filters.
+ return null;
+ }
- partitionLoader.open();
- List<BinaryRow> partitions;
- if (partitionLoader instanceof StaticPartitionLoader) {
- partitions = partitionLoader.partitions();
- } else if (partitionLoader instanceof DynamicPartitionLoader) {
- partitions = ((DynamicPartitionLoader)
partitionLoader).getMaxPartitions();
} else {
- throw new RuntimeException(
- "Failed to handle scan.partitions = " +
options.get(SCAN_PARTITIONS));
+ return null;
}
-
- // Partition filter will be used to filter Manifest stats, the stats
schema is
- // partition type. See SnapshotReaderImpl#withFilter
- Predicate predicate =
partitionLoader.createSpecificPartFilter(partitions);
- Predicate transformed =
- transformFieldMapping(
- predicate,
- PredicateBuilder.fieldIdxToPartitionIdx(
- table.rowType(),
table.partitionKeys()))
- .orElseThrow(
- () ->
- new RuntimeException(
- "Failed to transform the
partition predicate "
- + predicate));
- return PartitionPredicate.fromPredicate(
- table.rowType().project(table.partitionKeys()), transformed);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index a81cf7a506..589eeb14af 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -902,46 +902,6 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
assertThat(sql(query)).containsExactly(Row.of(1, 11), Row.of(1, 12),
Row.of(2, 22));
}
- @Test
- public void testScanWithSpecifiedPartitionsWithMaxPt() {
- sql("CREATE TABLE P (id INT, v INT, pt STRING) PARTITIONED BY (pt)");
- sql("CREATE TABLE Q (id INT)");
- sql(
- "INSERT INTO P VALUES (1, 10, 'a'), (2, 20, 'a'), (1, 11,
'b'), (3, 31, 'b'), (1, 12, 'c'), (2, 22, 'c'), (3, 32, 'c')");
- sql("INSERT INTO Q VALUES (1), (2), (3)");
- String query =
- "SELECT Q.id, P.v FROM Q INNER JOIN P /*+
OPTIONS('scan.partitions' = 'max_pt()') */ ON Q.id = P.id ORDER BY Q.id, P.v";
- assertThat(sql(query)).containsExactly(Row.of(1, 12), Row.of(2, 22),
Row.of(3, 32));
- }
-
- @Test
- public void testScanWithSpecifiedPartitionsWithMaxTwoPt() {
- sql("CREATE TABLE P (id INT, v INT, pt STRING) PARTITIONED BY (pt)");
- sql("CREATE TABLE Q (id INT)");
- sql(
- "INSERT INTO P VALUES (1, 10, 'a'), (2, 20, 'a'), (1, 11,
'b'), (3, 31, 'b'), (1, 12, 'c'), (2, 22, 'c'), (3, 32, 'c')");
- sql("INSERT INTO Q VALUES (1), (2), (3)");
- String query =
- "SELECT Q.id, P.v FROM Q INNER JOIN P /*+
OPTIONS('scan.partitions' = 'max_two_pt()') */ ON Q.id = P.id ORDER BY Q.id,
P.v";
- assertThat(sql(query))
- .containsExactly(
- Row.of(1, 11), Row.of(1, 12), Row.of(2, 22), Row.of(3,
31), Row.of(3, 32));
- }
-
- @Test
- public void testScanWithSpecifiedPartitionsWithLevelMaxPt() throws
Exception {
- sql(
- "CREATE TABLE P (id INT, v INT, pt1 STRING, pt2 STRING, pt3
STRING) PARTITIONED BY (pt1, pt2, pt3)");
- sql("CREATE TABLE Q (id INT)");
- sql(
- "INSERT INTO P VALUES (1, 10, 'a', '2025-10-01', '1'), (2, 20,
'a', '2025-10-01', '2'), (3, 30, 'a', '2025-10-02', '1'), (4, 40, 'a',
'2025-10-02', '2'), "
- + "(1, 11, 'b', '2025-10-01', '1'), (2, 21, 'b',
'2025-10-01', '2'), (3, 31, 'b', '2025-10-02', '1'), (4, 41, 'b', '2025-10-02',
'2')");
- sql("INSERT INTO Q VALUES (1), (2), (3), (4)");
- String query =
- "SELECT Q.id, P.v FROM Q INNER JOIN P /*+
OPTIONS('scan.partitions' = 'pt1=max_pt(),pt2=max_pt()') */ ON Q.id = P.id
ORDER BY Q.id, P.v";
- assertThat(sql(query)).containsExactly(Row.of(3, 31), Row.of(4, 41));
- }
-
@Test
public void testEmptyTableIncrementalBetweenTimestamp() {
assertThat(sql("SELECT * FROM T /*+
OPTIONS('incremental-between-timestamp'='0,1') */"))