This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f62948186b [flink] Introduce max_two_pt for Flink lookup join (#4772)
f62948186b is described below

commit f62948186b5fe36645914811410f24ab1003c216
Author: yuzelin <[email protected]>
AuthorDate: Wed Dec 25 15:29:28 2024 +0800

    [flink] Introduce max_two_pt for Flink lookup join (#4772)
---
 .../flink/lookup/DynamicPartitionLoader.java       | 65 ++++++++++++++++------
 .../flink/lookup/FileStoreLookupFunction.java      | 54 ++++++++++++------
 .../org/apache/paimon/flink/LookupJoinITCase.java  | 39 +++++++++++++
 3 files changed, 126 insertions(+), 32 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
index 37a504c588..7c30a1038c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
@@ -31,9 +31,10 @@ import javax.annotation.Nullable;
 import java.io.Serializable;
 import java.time.Duration;
 import java.time.LocalDateTime;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Objects;
+import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -45,22 +46,27 @@ public class DynamicPartitionLoader implements Serializable 
{
 
     private static final String MAX_PT = "max_pt()";
 
+    private static final String MAX_TWO_PT = "max_two_pt()";
+
     private final Table table;
     private final Duration refreshInterval;
+    private final int maxPartitionNum;
 
     private Comparator<InternalRow> comparator;
 
     private LocalDateTime lastRefresh;
-    @Nullable private BinaryRow partition;
+    private List<BinaryRow> partitions;
 
-    private DynamicPartitionLoader(Table table, Duration refreshInterval) {
+    private DynamicPartitionLoader(Table table, Duration refreshInterval, int 
maxPartitionNum) {
         this.table = table;
         this.refreshInterval = refreshInterval;
+        this.maxPartitionNum = maxPartitionNum;
     }
 
     public void open() {
         RowType partitionType = table.rowType().project(table.partitionKeys());
         this.comparator = 
CodeGenUtils.newRecordComparator(partitionType.getFieldTypes());
+        this.partitions = Collections.emptyList();
     }
 
     public void addPartitionKeysTo(List<String> joinKeys, List<String> 
projectFields) {
@@ -71,9 +77,8 @@ public class DynamicPartitionLoader implements Serializable {
         partitionKeys.stream().filter(k -> 
!projectFields.contains(k)).forEach(projectFields::add);
     }
 
-    @Nullable
-    public BinaryRow partition() {
-        return partition;
+    public List<BinaryRow> partitions() {
+        return partitions;
     }
 
     /** @return true if partition changed. */
@@ -83,14 +88,34 @@ public class DynamicPartitionLoader implements Serializable 
{
             return false;
         }
 
-        BinaryRow previous = this.partition;
-        partition =
-                table.newReadBuilder().newScan().listPartitions().stream()
-                        .max(comparator)
-                        .orElse(null);
+        List<BinaryRow> newPartitions = getMaxPartitions();
         lastRefresh = LocalDateTime.now();
 
-        return !Objects.equals(previous, partition);
+        if (newPartitions.size() != partitions.size()) {
+            partitions = newPartitions;
+            return true;
+        } else {
+            for (int i = 0; i < newPartitions.size(); i++) {
+                if (comparator.compare(newPartitions.get(i), 
partitions.get(i)) != 0) {
+                    partitions = newPartitions;
+                    return true;
+                }
+            }
+            return false;
+        }
+    }
+
+    private List<BinaryRow> getMaxPartitions() {
+        List<BinaryRow> newPartitions =
+                table.newReadBuilder().newScan().listPartitions().stream()
+                        .sorted(comparator.reversed())
+                        .collect(Collectors.toList());
+
+        if (newPartitions.size() <= maxPartitionNum) {
+            return newPartitions;
+        } else {
+            return newPartitions.subList(0, maxPartitionNum);
+        }
     }
 
     @Nullable
@@ -101,13 +126,21 @@ public class DynamicPartitionLoader implements 
Serializable {
             return null;
         }
 
-        if (!dynamicPartition.equalsIgnoreCase(MAX_PT)) {
-            throw new UnsupportedOperationException(
-                    "Unsupported dynamic partition pattern: " + 
dynamicPartition);
+        int maxPartitionNum;
+        switch (dynamicPartition.toLowerCase()) {
+            case MAX_PT:
+                maxPartitionNum = 1;
+                break;
+            case MAX_TWO_PT:
+                maxPartitionNum = 2;
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported dynamic partition pattern: " + 
dynamicPartition);
         }
 
         Duration refresh =
                 
options.get(FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION_REFRESH_INTERVAL);
-        return new DynamicPartitionLoader(table, refresh);
+        return new DynamicPartitionLoader(table, refresh, maxPartitionNum);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index e3f2fe110c..daf196d371 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -28,6 +28,7 @@ import org.apache.paimon.flink.FlinkRowWrapper;
 import org.apache.paimon.flink.utils.TableScanUtils;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.OutOfRangeException;
@@ -203,9 +204,9 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
         if (partitionLoader != null) {
             partitionLoader.open();
             partitionLoader.checkRefresh();
-            BinaryRow partition = partitionLoader.partition();
-            if (partition != null) {
-                
lookupTable.specificPartitionFilter(createSpecificPartFilter(partition));
+            List<BinaryRow> partitions = partitionLoader.partitions();
+            if (!partitions.isEmpty()) {
+                
lookupTable.specificPartitionFilter(createSpecificPartFilter(partitions));
             }
         }
 
@@ -236,17 +237,17 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
             tryRefresh();
 
             InternalRow key = new FlinkRowWrapper(keyRow);
-            if (partitionLoader != null) {
-                if (partitionLoader.partition() == null) {
-                    return Collections.emptyList();
-                }
-                key = JoinedRow.join(key, partitionLoader.partition());
+            if (partitionLoader == null) {
+                return lookupInternal(key);
+            }
+
+            if (partitionLoader.partitions().isEmpty()) {
+                return Collections.emptyList();
             }
 
-            List<InternalRow> results = lookupTable.get(key);
-            List<RowData> rows = new ArrayList<>(results.size());
-            for (InternalRow matchedRow : results) {
-                rows.add(new FlinkRowData(matchedRow));
+            List<RowData> rows = new ArrayList<>();
+            for (BinaryRow partition : partitionLoader.partitions()) {
+                rows.addAll(lookupInternal(JoinedRow.join(key, partition)));
             }
             return rows;
         } catch (OutOfRangeException | ReopenException e) {
@@ -257,7 +258,28 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
         }
     }
 
-    private Predicate createSpecificPartFilter(BinaryRow partition) {
+    private List<RowData> lookupInternal(InternalRow key) throws IOException {
+        List<RowData> rows = new ArrayList<>();
+        List<InternalRow> lookupResults = lookupTable.get(key);
+        for (InternalRow matchedRow : lookupResults) {
+            rows.add(new FlinkRowData(matchedRow));
+        }
+        return rows;
+    }
+
+    private Predicate createSpecificPartFilter(List<BinaryRow> partitions) {
+        Predicate partFilter = null;
+        for (BinaryRow partition : partitions) {
+            if (partFilter == null) {
+                partFilter = createSinglePartFilter(partition);
+            } else {
+                partFilter = PredicateBuilder.or(partFilter, 
createSinglePartFilter(partition));
+            }
+        }
+        return partFilter;
+    }
+
+    private Predicate createSinglePartFilter(BinaryRow partition) {
         RowType rowType = table.rowType();
         List<String> partitionKeys = table.partitionKeys();
         Object[] partitionSpec =
@@ -291,15 +313,15 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
         // 2. refresh dynamic partition
         if (partitionLoader != null) {
             boolean partitionChanged = partitionLoader.checkRefresh();
-            BinaryRow partition = partitionLoader.partition();
-            if (partition == null) {
+            List<BinaryRow> partitions = partitionLoader.partitions();
+            if (partitions.isEmpty()) {
                 // no data to be load, fast exit
                 return;
             }
 
             if (partitionChanged) {
                 // reopen with latest partition
-                
lookupTable.specificPartitionFilter(createSpecificPartFilter(partition));
+                
lookupTable.specificPartitionFilter(createSpecificPartFilter(partitions));
                 lookupTable.close();
                 lookupTable.open();
                 // no need to refresh the lookup table because it is reopened
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index a6abde57b8..86d4810c59 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -1006,4 +1006,43 @@ public class LookupJoinITCase extends CatalogITCaseBase {
 
         iterator.close();
     }
+
+    @ParameterizedTest
+    @EnumSource(LookupCacheMode.class)
+    public void testLookupMaxTwoPt0(LookupCacheMode mode) throws Exception {
+        sql(
+                "CREATE TABLE PARTITIONED_DIM (pt STRING, i INT, v INT)"
+                        + "PARTITIONED BY (`pt`) WITH ("
+                        + "'lookup.dynamic-partition' = 'max_two_pt()', "
+                        + "'lookup.dynamic-partition.refresh-interval' = '1 
ms', "
+                        + "'lookup.cache' = '%s', "
+                        + "'continuous.discovery-interval'='1 ms')",
+                mode);
+
+        String query =
+                "SELECT D.pt, T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM for 
SYSTEM_TIME AS OF T.proctime AS D ON T.i = D.i";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+        sql("INSERT INTO PARTITIONED_DIM VALUES ('2024-10-01', 1, 1), 
('2024-10-01', 2, 2)");
+        Thread.sleep(500); // wait refresh
+        sql("INSERT INTO T VALUES (1)");
+        List<Row> result = iterator.collect(1);
+        assertThat(result).containsExactlyInAnyOrder(Row.of("2024-10-01", 1, 
1));
+
+        sql("INSERT INTO PARTITIONED_DIM VALUES ('2024-10-02', 2, 2)");
+        Thread.sleep(500); // wait refresh
+        sql("INSERT INTO T VALUES (2)");
+        result = iterator.collect(2);
+        assertThat(result)
+                .containsExactlyInAnyOrder(Row.of("2024-10-01", 2, 2), 
Row.of("2024-10-02", 2, 2));
+
+        sql("ALTER TABLE PARTITIONED_DIM DROP PARTITION (pt = '2024-10-01')");
+        Thread.sleep(500); // wait refresh
+        sql("INSERT INTO T VALUES (1), (2)");
+        result = iterator.collect(2);
+        assertThat(result)
+                .containsExactlyInAnyOrder(Row.of(null, 1, null), 
Row.of("2024-10-02", 2, 2));
+
+        iterator.close();
+    }
 }

Reply via email to