This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f62948186b [flink] Introduce max_two_pt for Flink lookup join (#4772)
f62948186b is described below
commit f62948186b5fe36645914811410f24ab1003c216
Author: yuzelin <[email protected]>
AuthorDate: Wed Dec 25 15:29:28 2024 +0800
[flink] Introduce max_two_pt for Flink lookup join (#4772)
---
.../flink/lookup/DynamicPartitionLoader.java | 65 ++++++++++++++++------
.../flink/lookup/FileStoreLookupFunction.java | 54 ++++++++++++------
.../org/apache/paimon/flink/LookupJoinITCase.java | 39 +++++++++++++
3 files changed, 126 insertions(+), 32 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
index 37a504c588..7c30a1038c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
@@ -31,9 +31,10 @@ import javax.annotation.Nullable;
import java.io.Serializable;
import java.time.Duration;
import java.time.LocalDateTime;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
-import java.util.Objects;
+import java.util.stream.Collectors;
import static
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -45,22 +46,27 @@ public class DynamicPartitionLoader implements Serializable
{
private static final String MAX_PT = "max_pt()";
+ private static final String MAX_TWO_PT = "max_two_pt()";
+
private final Table table;
private final Duration refreshInterval;
+ private final int maxPartitionNum;
private Comparator<InternalRow> comparator;
private LocalDateTime lastRefresh;
- @Nullable private BinaryRow partition;
+ private List<BinaryRow> partitions;
- private DynamicPartitionLoader(Table table, Duration refreshInterval) {
+ private DynamicPartitionLoader(Table table, Duration refreshInterval, int
maxPartitionNum) {
this.table = table;
this.refreshInterval = refreshInterval;
+ this.maxPartitionNum = maxPartitionNum;
}
public void open() {
RowType partitionType = table.rowType().project(table.partitionKeys());
this.comparator =
CodeGenUtils.newRecordComparator(partitionType.getFieldTypes());
+ this.partitions = Collections.emptyList();
}
public void addPartitionKeysTo(List<String> joinKeys, List<String>
projectFields) {
@@ -71,9 +77,8 @@ public class DynamicPartitionLoader implements Serializable {
partitionKeys.stream().filter(k ->
!projectFields.contains(k)).forEach(projectFields::add);
}
- @Nullable
- public BinaryRow partition() {
- return partition;
+ public List<BinaryRow> partitions() {
+ return partitions;
}
/** @return true if partition changed. */
@@ -83,14 +88,34 @@ public class DynamicPartitionLoader implements Serializable
{
return false;
}
- BinaryRow previous = this.partition;
- partition =
- table.newReadBuilder().newScan().listPartitions().stream()
- .max(comparator)
- .orElse(null);
+ List<BinaryRow> newPartitions = getMaxPartitions();
lastRefresh = LocalDateTime.now();
- return !Objects.equals(previous, partition);
+ if (newPartitions.size() != partitions.size()) {
+ partitions = newPartitions;
+ return true;
+ } else {
+ for (int i = 0; i < newPartitions.size(); i++) {
+ if (comparator.compare(newPartitions.get(i),
partitions.get(i)) != 0) {
+ partitions = newPartitions;
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
+ private List<BinaryRow> getMaxPartitions() {
+ List<BinaryRow> newPartitions =
+ table.newReadBuilder().newScan().listPartitions().stream()
+ .sorted(comparator.reversed())
+ .collect(Collectors.toList());
+
+ if (newPartitions.size() <= maxPartitionNum) {
+ return newPartitions;
+ } else {
+ return newPartitions.subList(0, maxPartitionNum);
+ }
}
@Nullable
@@ -101,13 +126,21 @@ public class DynamicPartitionLoader implements
Serializable {
return null;
}
- if (!dynamicPartition.equalsIgnoreCase(MAX_PT)) {
- throw new UnsupportedOperationException(
- "Unsupported dynamic partition pattern: " +
dynamicPartition);
+ int maxPartitionNum;
+ switch (dynamicPartition.toLowerCase()) {
+ case MAX_PT:
+ maxPartitionNum = 1;
+ break;
+ case MAX_TWO_PT:
+ maxPartitionNum = 2;
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported dynamic partition pattern: " +
dynamicPartition);
}
Duration refresh =
options.get(FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION_REFRESH_INTERVAL);
- return new DynamicPartitionLoader(table, refresh);
+ return new DynamicPartitionLoader(table, refresh, maxPartitionNum);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index e3f2fe110c..daf196d371 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -28,6 +28,7 @@ import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.OutOfRangeException;
@@ -203,9 +204,9 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
if (partitionLoader != null) {
partitionLoader.open();
partitionLoader.checkRefresh();
- BinaryRow partition = partitionLoader.partition();
- if (partition != null) {
-
lookupTable.specificPartitionFilter(createSpecificPartFilter(partition));
+ List<BinaryRow> partitions = partitionLoader.partitions();
+ if (!partitions.isEmpty()) {
+
lookupTable.specificPartitionFilter(createSpecificPartFilter(partitions));
}
}
@@ -236,17 +237,17 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
tryRefresh();
InternalRow key = new FlinkRowWrapper(keyRow);
- if (partitionLoader != null) {
- if (partitionLoader.partition() == null) {
- return Collections.emptyList();
- }
- key = JoinedRow.join(key, partitionLoader.partition());
+ if (partitionLoader == null) {
+ return lookupInternal(key);
+ }
+
+ if (partitionLoader.partitions().isEmpty()) {
+ return Collections.emptyList();
}
- List<InternalRow> results = lookupTable.get(key);
- List<RowData> rows = new ArrayList<>(results.size());
- for (InternalRow matchedRow : results) {
- rows.add(new FlinkRowData(matchedRow));
+ List<RowData> rows = new ArrayList<>();
+ for (BinaryRow partition : partitionLoader.partitions()) {
+ rows.addAll(lookupInternal(JoinedRow.join(key, partition)));
}
return rows;
} catch (OutOfRangeException | ReopenException e) {
@@ -257,7 +258,28 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
}
}
- private Predicate createSpecificPartFilter(BinaryRow partition) {
+ private List<RowData> lookupInternal(InternalRow key) throws IOException {
+ List<RowData> rows = new ArrayList<>();
+ List<InternalRow> lookupResults = lookupTable.get(key);
+ for (InternalRow matchedRow : lookupResults) {
+ rows.add(new FlinkRowData(matchedRow));
+ }
+ return rows;
+ }
+
+ private Predicate createSpecificPartFilter(List<BinaryRow> partitions) {
+ Predicate partFilter = null;
+ for (BinaryRow partition : partitions) {
+ if (partFilter == null) {
+ partFilter = createSinglePartFilter(partition);
+ } else {
+ partFilter = PredicateBuilder.or(partFilter,
createSinglePartFilter(partition));
+ }
+ }
+ return partFilter;
+ }
+
+ private Predicate createSinglePartFilter(BinaryRow partition) {
RowType rowType = table.rowType();
List<String> partitionKeys = table.partitionKeys();
Object[] partitionSpec =
@@ -291,15 +313,15 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
// 2. refresh dynamic partition
if (partitionLoader != null) {
boolean partitionChanged = partitionLoader.checkRefresh();
- BinaryRow partition = partitionLoader.partition();
- if (partition == null) {
+ List<BinaryRow> partitions = partitionLoader.partitions();
+ if (partitions.isEmpty()) {
// no data to be load, fast exit
return;
}
if (partitionChanged) {
// reopen with latest partition
-
lookupTable.specificPartitionFilter(createSpecificPartFilter(partition));
+
lookupTable.specificPartitionFilter(createSpecificPartFilter(partitions));
lookupTable.close();
lookupTable.open();
// no need to refresh the lookup table because it is reopened
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index a6abde57b8..86d4810c59 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -1006,4 +1006,43 @@ public class LookupJoinITCase extends CatalogITCaseBase {
iterator.close();
}
+
+ @ParameterizedTest
+ @EnumSource(LookupCacheMode.class)
+ public void testLookupMaxTwoPt0(LookupCacheMode mode) throws Exception {
+ sql(
+ "CREATE TABLE PARTITIONED_DIM (pt STRING, i INT, v INT)"
+ + "PARTITIONED BY (`pt`) WITH ("
+ + "'lookup.dynamic-partition' = 'max_two_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1
ms', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ mode);
+
+ String query =
+ "SELECT D.pt, T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM for
SYSTEM_TIME AS OF T.proctime AS D ON T.i = D.i";
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('2024-10-01', 1, 1),
('2024-10-01', 2, 2)");
+ Thread.sleep(500); // wait refresh
+ sql("INSERT INTO T VALUES (1)");
+ List<Row> result = iterator.collect(1);
+ assertThat(result).containsExactlyInAnyOrder(Row.of("2024-10-01", 1,
1));
+
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('2024-10-02', 2, 2)");
+ Thread.sleep(500); // wait refresh
+ sql("INSERT INTO T VALUES (2)");
+ result = iterator.collect(2);
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of("2024-10-01", 2, 2),
Row.of("2024-10-02", 2, 2));
+
+ sql("ALTER TABLE PARTITIONED_DIM DROP PARTITION (pt = '2024-10-01')");
+ Thread.sleep(500); // wait refresh
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(2);
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of(null, 1, null),
Row.of("2024-10-02", 2, 2));
+
+ iterator.close();
+ }
}