This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4620f30ccf [refactor] Refactor LookupJoin: add logs for troubleshoot 
and polish DynamicPartitionLoader (#4828)
4620f30ccf is described below

commit 4620f30ccfc7068f2a33a93d8f5ea4a92fd409b2
Author: yuzelin <[email protected]>
AuthorDate: Tue Jan 7 22:52:09 2025 +0800

    [refactor] Refactor LookupJoin: add logs for troubleshoot and polish 
DynamicPartitionLoader (#4828)
---
 .../flink/lookup/DynamicPartitionLoader.java       | 75 ++++++++++++++++++++++
 .../flink/lookup/FileStoreLookupFunction.java      | 50 ++++-----------
 .../paimon/flink/lookup/LookupDataTableScan.java   |  6 ++
 .../paimon/flink/lookup/LookupStreamingReader.java | 20 ++++++
 .../flink/lookup/PrimaryKeyPartialLookupTable.java | 27 ++++++--
 5 files changed, 138 insertions(+), 40 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
index 7c30a1038c..aeadc7cdd5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
@@ -23,8 +23,15 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -33,15 +40,20 @@ import java.time.Duration;
 import java.time.LocalDateTime;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION;
+import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Dynamic partition for lookup. */
 public class DynamicPartitionLoader implements Serializable {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(DynamicPartitionLoader.class);
+
     private static final long serialVersionUID = 1L;
 
     private static final String MAX_PT = "max_pt()";
@@ -51,6 +63,7 @@ public class DynamicPartitionLoader implements Serializable {
     private final Table table;
     private final Duration refreshInterval;
     private final int maxPartitionNum;
+    private final RowDataToObjectArrayConverter partitionConverter;
 
     private Comparator<InternalRow> comparator;
 
@@ -61,6 +74,8 @@ public class DynamicPartitionLoader implements Serializable {
         this.table = table;
         this.refreshInterval = refreshInterval;
         this.maxPartitionNum = maxPartitionNum;
+        this.partitionConverter =
+                new 
RowDataToObjectArrayConverter(table.rowType().project(table.partitionKeys()));
     }
 
     public void open() {
@@ -81,6 +96,31 @@ public class DynamicPartitionLoader implements Serializable {
         return partitions;
     }
 
+    public Predicate createSpecificPartFilter() {
+        Predicate partFilter = null;
+        for (BinaryRow partition : partitions) {
+            if (partFilter == null) {
+                partFilter = createSinglePartFilter(partition);
+            } else {
+                partFilter = PredicateBuilder.or(partFilter, 
createSinglePartFilter(partition));
+            }
+        }
+        return partFilter;
+    }
+
+    private Predicate createSinglePartFilter(BinaryRow partition) {
+        RowType rowType = table.rowType();
+        List<String> partitionKeys = table.partitionKeys();
+        Object[] partitionSpec = partitionConverter.convert(partition);
+        Map<String, Object> partitionMap = new HashMap<>(partitionSpec.length);
+        for (int i = 0; i < partitionSpec.length; i++) {
+            partitionMap.put(partitionKeys.get(i), partitionSpec[i]);
+        }
+
+        // create partition predicate base on rowType instead of partitionType
+        return createPartitionPredicate(rowType, partitionMap);
+    }
+
     /** @return true if partition changed. */
     public boolean checkRefresh() {
         if (lastRefresh != null
@@ -88,23 +128,53 @@ public class DynamicPartitionLoader implements 
Serializable {
             return false;
         }
 
+        LOG.info(
+                "DynamicPartitionLoader(maxPartitionNum={},table={}) refreshed 
after {} second(s), refreshing",
+                maxPartitionNum,
+                table.name(),
+                refreshInterval.toMillis() / 1000);
+
         List<BinaryRow> newPartitions = getMaxPartitions();
         lastRefresh = LocalDateTime.now();
 
         if (newPartitions.size() != partitions.size()) {
             partitions = newPartitions;
+            logNewPartitions();
             return true;
         } else {
             for (int i = 0; i < newPartitions.size(); i++) {
                 if (comparator.compare(newPartitions.get(i), 
partitions.get(i)) != 0) {
                     partitions = newPartitions;
+                    logNewPartitions();
                     return true;
                 }
             }
+            LOG.info(
+                    "DynamicPartitionLoader(maxPartitionNum={},table={}) 
didn't find new partitions.",
+                    maxPartitionNum,
+                    table.name());
             return false;
         }
     }
 
+    private void logNewPartitions() {
+        String partitionsStr =
+                partitions.stream()
+                        .map(
+                                partition ->
+                                        
InternalRowPartitionComputer.partToSimpleString(
+                                                
table.rowType().project(table.partitionKeys()),
+                                                partition,
+                                                "-",
+                                                200))
+                        .collect(Collectors.joining(","));
+        LOG.info(
+                "DynamicPartitionLoader(maxPartitionNum={},table={}) finds new 
partitions: {}.",
+                maxPartitionNum,
+                table.name(),
+                partitionsStr);
+    }
+
     private List<BinaryRow> getMaxPartitions() {
         List<BinaryRow> newPartitions =
                 table.newReadBuilder().newScan().listPartitions().stream()
@@ -126,6 +196,11 @@ public class DynamicPartitionLoader implements 
Serializable {
             return null;
         }
 
+        checkArgument(
+                !table.partitionKeys().isEmpty(),
+                "{} is not supported for non-partitioned table.",
+                LOOKUP_DYNAMIC_PARTITION);
+
         int maxPartitionNum;
         switch (dynamicPartition.toLowerCase()) {
             case MAX_PT:
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index daf196d371..2f3ec62bbc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -28,14 +28,12 @@ import org.apache.paimon.flink.FlinkRowWrapper;
 import org.apache.paimon.flink.utils.TableScanUtils;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.OutOfRangeException;
-import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileIOUtils;
 import org.apache.paimon.utils.Filter;
-import org.apache.paimon.utils.RowDataToObjectArrayConverter;
 
 import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
 
@@ -58,10 +56,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
@@ -74,7 +70,6 @@ import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_TIME_
 import static 
org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable;
 import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CACHE_ROWS;
 import static 
org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL;
-import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static 
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
 
 /** A lookup {@link TableFunction} for file store. */
@@ -168,12 +163,15 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
         int[] projection = 
projectFields.stream().mapToInt(fieldNames::indexOf).toArray();
         FileStoreTable storeTable = (FileStoreTable) table;
 
+        LOG.info("Creating lookup table for {}.", table.name());
         if (options.get(LOOKUP_CACHE_MODE) == LookupCacheMode.AUTO
                 && new HashSet<>(table.primaryKeys()).equals(new 
HashSet<>(joinKeys))) {
             if (isRemoteServiceAvailable(storeTable)) {
                 this.lookupTable =
                         PrimaryKeyPartialLookupTable.createRemoteTable(
                                 storeTable, projection, joinKeys);
+                LOG.info(
+                        "Remote service is available. Created 
PrimaryKeyPartialLookupTable with remote service.");
             } else {
                 try {
                     this.lookupTable =
@@ -183,7 +181,13 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
                                     path,
                                     joinKeys,
                                     getRequireCachedBucketIds());
-                } catch (UnsupportedOperationException ignore2) {
+                    LOG.info(
+                            "Remote service isn't available. Created 
PrimaryKeyPartialLookupTable with LocalQueryExecutor.");
+                } catch (UnsupportedOperationException ignore) {
+                    LOG.info(
+                            "Remote service isn't available. Cannot create 
PrimaryKeyPartialLookupTable with LocalQueryExecutor "
+                                    + "because bucket mode isn't {}. Will 
create FullCacheLookupTable.",
+                            BucketMode.HASH_FIXED);
                 }
             }
         }
@@ -199,6 +203,7 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
                             joinKeys,
                             getRequireCachedBucketIds());
             this.lookupTable = FullCacheLookupTable.create(context, 
options.get(LOOKUP_CACHE_ROWS));
+            LOG.info("Created {}.", lookupTable.getClass().getSimpleName());
         }
 
         if (partitionLoader != null) {
@@ -206,7 +211,7 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
             partitionLoader.checkRefresh();
             List<BinaryRow> partitions = partitionLoader.partitions();
             if (!partitions.isEmpty()) {
-                
lookupTable.specificPartitionFilter(createSpecificPartFilter(partitions));
+                
lookupTable.specificPartitionFilter(partitionLoader.createSpecificPartFilter());
             }
         }
 
@@ -267,33 +272,6 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
         return rows;
     }
 
-    private Predicate createSpecificPartFilter(List<BinaryRow> partitions) {
-        Predicate partFilter = null;
-        for (BinaryRow partition : partitions) {
-            if (partFilter == null) {
-                partFilter = createSinglePartFilter(partition);
-            } else {
-                partFilter = PredicateBuilder.or(partFilter, 
createSinglePartFilter(partition));
-            }
-        }
-        return partFilter;
-    }
-
-    private Predicate createSinglePartFilter(BinaryRow partition) {
-        RowType rowType = table.rowType();
-        List<String> partitionKeys = table.partitionKeys();
-        Object[] partitionSpec =
-                new 
RowDataToObjectArrayConverter(rowType.project(partitionKeys))
-                        .convert(partition);
-        Map<String, Object> partitionMap = new HashMap<>(partitionSpec.length);
-        for (int i = 0; i < partitionSpec.length; i++) {
-            partitionMap.put(partitionKeys.get(i), partitionSpec[i]);
-        }
-
-        // create partition predicate base on rowType instead of partitionType
-        return createPartitionPredicate(rowType, partitionMap);
-    }
-
     private void reopen() {
         try {
             close();
@@ -321,7 +299,7 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
 
             if (partitionChanged) {
                 // reopen with latest partition
-                
lookupTable.specificPartitionFilter(createSpecificPartFilter(partitions));
+                
lookupTable.specificPartitionFilter(partitionLoader.createSpecificPartFilter());
                 lookupTable.close();
                 lookupTable.open();
                 // no need to refresh the lookup table because it is reopened
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
index f43d80321e..72041811d6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
@@ -30,6 +30,9 @@ import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
 import org.apache.paimon.utils.SnapshotManager;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import static org.apache.paimon.CoreOptions.StartupMode;
@@ -41,6 +44,8 @@ import static 
org.apache.paimon.flink.lookup.LookupFileStoreTable.LookupStreamSc
  */
 public class LookupDataTableScan extends DataTableStreamScan {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(LookupDataTableScan.class);
+
     private final StartupMode startupMode;
     private final LookupStreamScanMode lookupScanMode;
 
@@ -69,6 +74,7 @@ public class LookupDataTableScan extends DataTableStreamScan {
         if (plan != null) {
             return plan;
         }
+        LOG.info("Dim table found OVERWRITE snapshot {}, reopen.", 
snapshot.id());
         throw new ReopenException();
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
index 132b30138d..9615de48be 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
@@ -26,6 +26,7 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.ReaderSupplier;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.StreamTableScan;
@@ -36,6 +37,9 @@ import org.apache.paimon.utils.TypeUtils;
 
 import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
@@ -50,6 +54,8 @@ import static 
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping
 /** A streaming reader to load data into {@link LookupTable}. */
 public class LookupStreamingReader {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(LookupStreamingReader.class);
+
     private final LookupFileStoreTable table;
     private final int[] projection;
     @Nullable private final Filter<InternalRow> cacheRowFilter;
@@ -103,6 +109,7 @@ public class LookupStreamingReader {
 
     public RecordReader<InternalRow> nextBatch(boolean useParallelism) throws 
Exception {
         List<Split> splits = scan.plan().splits();
+        log(splits);
         CoreOptions options = CoreOptions.fromMap(table.options());
         FunctionWithIOException<Split, RecordReader<InternalRow>> 
readerSupplier =
                 split -> readBuilder.newRead().createReader(split);
@@ -136,6 +143,19 @@ public class LookupStreamingReader {
         return reader;
     }
 
+    private void log(List<Split> splits) {
+        if (splits.isEmpty()) {
+            LOG.info("LookupStreamingReader didn't get splits from {}.", 
table.name());
+            return;
+        }
+
+        DataSplit dataSplit = (DataSplit) splits.get(0);
+        LOG.info(
+                "LookupStreamingReader get splits from {} with snapshotId {}.",
+                table.name(),
+                dataSplit.snapshotId());
+    }
+
     @Nullable
     public Long nextSnapshotId() {
         return scan.checkpoint();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
index 7bd7a652b5..255351767c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
@@ -34,6 +34,9 @@ import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.ProjectedRow;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.io.Closeable;
@@ -186,8 +189,11 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
 
     static class LocalQueryExecutor implements QueryExecutor {
 
+        private static final Logger LOG = 
LoggerFactory.getLogger(LocalQueryExecutor.class);
+
         private final LocalTableQuery tableQuery;
         private final StreamTableScan scan;
+        private final String tableName;
 
         private LocalQueryExecutor(
                 FileStoreTable table,
@@ -214,6 +220,8 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
                                             ? null
                                             : requireCachedBucketIds::contains)
                             .newStreamScan();
+
+            this.tableName = table.name();
         }
 
         @Override
@@ -226,15 +234,13 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
         public void refresh() {
             while (true) {
                 List<Split> splits = scan.plan().splits();
+                log(splits);
+
                 if (splits.isEmpty()) {
                     return;
                 }
 
                 for (Split split : splits) {
-                    if (!(split instanceof DataSplit)) {
-                        throw new IllegalArgumentException(
-                                "Unsupported split: " + split.getClass());
-                    }
                     BinaryRow partition = ((DataSplit) split).partition();
                     int bucket = ((DataSplit) split).bucket();
                     List<DataFileMeta> before = ((DataSplit) 
split).beforeFiles();
@@ -249,6 +255,19 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
         public void close() throws IOException {
             tableQuery.close();
         }
+
+        private void log(List<Split> splits) {
+            if (splits.isEmpty()) {
+                LOG.info("LocalQueryExecutor didn't get splits from {}.", 
tableName);
+                return;
+            }
+
+            DataSplit dataSplit = (DataSplit) splits.get(0);
+            LOG.info(
+                    "LocalQueryExecutor get splits from {} with snapshotId 
{}.",
+                    tableName,
+                    dataSplit.snapshotId());
+        }
     }
 
     static class RemoteQueryExecutor implements QueryExecutor {

Reply via email to