This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4620f30ccf [refactor] Refactor LookupJoin: add logs for troubleshoot
and polish DynamicPartitionLoader (#4828)
4620f30ccf is described below
commit 4620f30ccfc7068f2a33a93d8f5ea4a92fd409b2
Author: yuzelin <[email protected]>
AuthorDate: Tue Jan 7 22:52:09 2025 +0800
[refactor] Refactor LookupJoin: add logs for troubleshoot and polish
DynamicPartitionLoader (#4828)
---
.../flink/lookup/DynamicPartitionLoader.java | 75 ++++++++++++++++++++++
.../flink/lookup/FileStoreLookupFunction.java | 50 ++++-----------
.../paimon/flink/lookup/LookupDataTableScan.java | 6 ++
.../paimon/flink/lookup/LookupStreamingReader.java | 20 ++++++
.../flink/lookup/PrimaryKeyPartialLookupTable.java | 27 ++++++--
5 files changed, 138 insertions(+), 40 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
index 7c30a1038c..aeadc7cdd5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
@@ -23,8 +23,15 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
@@ -33,15 +40,20 @@ import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import static
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION;
+import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Dynamic partition for lookup. */
public class DynamicPartitionLoader implements Serializable {
+ private static final Logger LOG =
LoggerFactory.getLogger(DynamicPartitionLoader.class);
+
private static final long serialVersionUID = 1L;
private static final String MAX_PT = "max_pt()";
@@ -51,6 +63,7 @@ public class DynamicPartitionLoader implements Serializable {
private final Table table;
private final Duration refreshInterval;
private final int maxPartitionNum;
+ private final RowDataToObjectArrayConverter partitionConverter;
private Comparator<InternalRow> comparator;
@@ -61,6 +74,8 @@ public class DynamicPartitionLoader implements Serializable {
this.table = table;
this.refreshInterval = refreshInterval;
this.maxPartitionNum = maxPartitionNum;
+ this.partitionConverter =
+ new
RowDataToObjectArrayConverter(table.rowType().project(table.partitionKeys()));
}
public void open() {
@@ -81,6 +96,31 @@ public class DynamicPartitionLoader implements Serializable {
return partitions;
}
+ public Predicate createSpecificPartFilter() {
+ Predicate partFilter = null;
+ for (BinaryRow partition : partitions) {
+ if (partFilter == null) {
+ partFilter = createSinglePartFilter(partition);
+ } else {
+ partFilter = PredicateBuilder.or(partFilter,
createSinglePartFilter(partition));
+ }
+ }
+ return partFilter;
+ }
+
+ private Predicate createSinglePartFilter(BinaryRow partition) {
+ RowType rowType = table.rowType();
+ List<String> partitionKeys = table.partitionKeys();
+ Object[] partitionSpec = partitionConverter.convert(partition);
+ Map<String, Object> partitionMap = new HashMap<>(partitionSpec.length);
+ for (int i = 0; i < partitionSpec.length; i++) {
+ partitionMap.put(partitionKeys.get(i), partitionSpec[i]);
+ }
+
+ // create partition predicate base on rowType instead of partitionType
+ return createPartitionPredicate(rowType, partitionMap);
+ }
+
/** @return true if partition changed. */
public boolean checkRefresh() {
if (lastRefresh != null
@@ -88,23 +128,53 @@ public class DynamicPartitionLoader implements
Serializable {
return false;
}
+ LOG.info(
+ "DynamicPartitionLoader(maxPartitionNum={},table={}) refreshed
after {} second(s), refreshing",
+ maxPartitionNum,
+ table.name(),
+ refreshInterval.toMillis() / 1000);
+
List<BinaryRow> newPartitions = getMaxPartitions();
lastRefresh = LocalDateTime.now();
if (newPartitions.size() != partitions.size()) {
partitions = newPartitions;
+ logNewPartitions();
return true;
} else {
for (int i = 0; i < newPartitions.size(); i++) {
if (comparator.compare(newPartitions.get(i),
partitions.get(i)) != 0) {
partitions = newPartitions;
+ logNewPartitions();
return true;
}
}
+ LOG.info(
+ "DynamicPartitionLoader(maxPartitionNum={},table={})
didn't find new partitions.",
+ maxPartitionNum,
+ table.name());
return false;
}
}
+ private void logNewPartitions() {
+ String partitionsStr =
+ partitions.stream()
+ .map(
+ partition ->
+
InternalRowPartitionComputer.partToSimpleString(
+
table.rowType().project(table.partitionKeys()),
+ partition,
+ "-",
+ 200))
+ .collect(Collectors.joining(","));
+ LOG.info(
+ "DynamicPartitionLoader(maxPartitionNum={},table={}) finds new
partitions: {}.",
+ maxPartitionNum,
+ table.name(),
+ partitionsStr);
+ }
+
private List<BinaryRow> getMaxPartitions() {
List<BinaryRow> newPartitions =
table.newReadBuilder().newScan().listPartitions().stream()
@@ -126,6 +196,11 @@ public class DynamicPartitionLoader implements
Serializable {
return null;
}
+ checkArgument(
+ !table.partitionKeys().isEmpty(),
+ "{} is not supported for non-partitioned table.",
+ LOOKUP_DYNAMIC_PARTITION);
+
int maxPartitionNum;
switch (dynamicPartition.toLowerCase()) {
case MAX_PT:
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index daf196d371..2f3ec62bbc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -28,14 +28,12 @@ import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.OutOfRangeException;
-import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.Filter;
-import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
@@ -58,10 +56,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
@@ -74,7 +70,6 @@ import static
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_TIME_
import static
org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable;
import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CACHE_ROWS;
import static
org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL;
-import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
/** A lookup {@link TableFunction} for file store. */
@@ -168,12 +163,15 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
int[] projection =
projectFields.stream().mapToInt(fieldNames::indexOf).toArray();
FileStoreTable storeTable = (FileStoreTable) table;
+ LOG.info("Creating lookup table for {}.", table.name());
if (options.get(LOOKUP_CACHE_MODE) == LookupCacheMode.AUTO
&& new HashSet<>(table.primaryKeys()).equals(new
HashSet<>(joinKeys))) {
if (isRemoteServiceAvailable(storeTable)) {
this.lookupTable =
PrimaryKeyPartialLookupTable.createRemoteTable(
storeTable, projection, joinKeys);
+ LOG.info(
+ "Remote service is available. Created
PrimaryKeyPartialLookupTable with remote service.");
} else {
try {
this.lookupTable =
@@ -183,7 +181,13 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
path,
joinKeys,
getRequireCachedBucketIds());
- } catch (UnsupportedOperationException ignore2) {
+ LOG.info(
+ "Remote service isn't available. Created
PrimaryKeyPartialLookupTable with LocalQueryExecutor.");
+ } catch (UnsupportedOperationException ignore) {
+ LOG.info(
+ "Remote service isn't available. Cannot create
PrimaryKeyPartialLookupTable with LocalQueryExecutor "
+ + "because bucket mode isn't {}. Will
create FullCacheLookupTable.",
+ BucketMode.HASH_FIXED);
}
}
}
@@ -199,6 +203,7 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
joinKeys,
getRequireCachedBucketIds());
this.lookupTable = FullCacheLookupTable.create(context,
options.get(LOOKUP_CACHE_ROWS));
+ LOG.info("Created {}.", lookupTable.getClass().getSimpleName());
}
if (partitionLoader != null) {
@@ -206,7 +211,7 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
partitionLoader.checkRefresh();
List<BinaryRow> partitions = partitionLoader.partitions();
if (!partitions.isEmpty()) {
-
lookupTable.specificPartitionFilter(createSpecificPartFilter(partitions));
+
lookupTable.specificPartitionFilter(partitionLoader.createSpecificPartFilter());
}
}
@@ -267,33 +272,6 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
return rows;
}
- private Predicate createSpecificPartFilter(List<BinaryRow> partitions) {
- Predicate partFilter = null;
- for (BinaryRow partition : partitions) {
- if (partFilter == null) {
- partFilter = createSinglePartFilter(partition);
- } else {
- partFilter = PredicateBuilder.or(partFilter,
createSinglePartFilter(partition));
- }
- }
- return partFilter;
- }
-
- private Predicate createSinglePartFilter(BinaryRow partition) {
- RowType rowType = table.rowType();
- List<String> partitionKeys = table.partitionKeys();
- Object[] partitionSpec =
- new
RowDataToObjectArrayConverter(rowType.project(partitionKeys))
- .convert(partition);
- Map<String, Object> partitionMap = new HashMap<>(partitionSpec.length);
- for (int i = 0; i < partitionSpec.length; i++) {
- partitionMap.put(partitionKeys.get(i), partitionSpec[i]);
- }
-
- // create partition predicate base on rowType instead of partitionType
- return createPartitionPredicate(rowType, partitionMap);
- }
-
private void reopen() {
try {
close();
@@ -321,7 +299,7 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
if (partitionChanged) {
// reopen with latest partition
-
lookupTable.specificPartitionFilter(createSpecificPartFilter(partitions));
+
lookupTable.specificPartitionFilter(partitionLoader.createSpecificPartFilter());
lookupTable.close();
lookupTable.open();
// no need to refresh the lookup table because it is reopened
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
index f43d80321e..72041811d6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
@@ -30,6 +30,9 @@ import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.utils.SnapshotManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
import static org.apache.paimon.CoreOptions.StartupMode;
@@ -41,6 +44,8 @@ import static
org.apache.paimon.flink.lookup.LookupFileStoreTable.LookupStreamSc
*/
public class LookupDataTableScan extends DataTableStreamScan {
+ private static final Logger LOG =
LoggerFactory.getLogger(LookupDataTableScan.class);
+
private final StartupMode startupMode;
private final LookupStreamScanMode lookupScanMode;
@@ -69,6 +74,7 @@ public class LookupDataTableScan extends DataTableStreamScan {
if (plan != null) {
return plan;
}
+ LOG.info("Dim table found OVERWRITE snapshot {}, reopen.",
snapshot.id());
throw new ReopenException();
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
index 132b30138d..9615de48be 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
@@ -26,6 +26,7 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
@@ -36,6 +37,9 @@ import org.apache.paimon.utils.TypeUtils;
import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
import java.util.ArrayList;
@@ -50,6 +54,8 @@ import static
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping
/** A streaming reader to load data into {@link LookupTable}. */
public class LookupStreamingReader {
+ private static final Logger LOG =
LoggerFactory.getLogger(LookupStreamingReader.class);
+
private final LookupFileStoreTable table;
private final int[] projection;
@Nullable private final Filter<InternalRow> cacheRowFilter;
@@ -103,6 +109,7 @@ public class LookupStreamingReader {
public RecordReader<InternalRow> nextBatch(boolean useParallelism) throws
Exception {
List<Split> splits = scan.plan().splits();
+ log(splits);
CoreOptions options = CoreOptions.fromMap(table.options());
FunctionWithIOException<Split, RecordReader<InternalRow>>
readerSupplier =
split -> readBuilder.newRead().createReader(split);
@@ -136,6 +143,19 @@ public class LookupStreamingReader {
return reader;
}
+ private void log(List<Split> splits) {
+ if (splits.isEmpty()) {
+ LOG.info("LookupStreamingReader didn't get splits from {}.",
table.name());
+ return;
+ }
+
+ DataSplit dataSplit = (DataSplit) splits.get(0);
+ LOG.info(
+ "LookupStreamingReader get splits from {} with snapshotId {}.",
+ table.name(),
+ dataSplit.snapshotId());
+ }
+
@Nullable
public Long nextSnapshotId() {
return scan.checkpoint();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
index 7bd7a652b5..255351767c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
@@ -34,6 +34,9 @@ import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.ProjectedRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
import java.io.Closeable;
@@ -186,8 +189,11 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
static class LocalQueryExecutor implements QueryExecutor {
+ private static final Logger LOG =
LoggerFactory.getLogger(LocalQueryExecutor.class);
+
private final LocalTableQuery tableQuery;
private final StreamTableScan scan;
+ private final String tableName;
private LocalQueryExecutor(
FileStoreTable table,
@@ -214,6 +220,8 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
? null
: requireCachedBucketIds::contains)
.newStreamScan();
+
+ this.tableName = table.name();
}
@Override
@@ -226,15 +234,13 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
public void refresh() {
while (true) {
List<Split> splits = scan.plan().splits();
+ log(splits);
+
if (splits.isEmpty()) {
return;
}
for (Split split : splits) {
- if (!(split instanceof DataSplit)) {
- throw new IllegalArgumentException(
- "Unsupported split: " + split.getClass());
- }
BinaryRow partition = ((DataSplit) split).partition();
int bucket = ((DataSplit) split).bucket();
List<DataFileMeta> before = ((DataSplit)
split).beforeFiles();
@@ -249,6 +255,19 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
public void close() throws IOException {
tableQuery.close();
}
+
+ private void log(List<Split> splits) {
+ if (splits.isEmpty()) {
+ LOG.info("LocalQueryExecutor didn't get splits from {}.",
tableName);
+ return;
+ }
+
+ DataSplit dataSplit = (DataSplit) splits.get(0);
+ LOG.info(
+ "LocalQueryExecutor get splits from {} with snapshotId
{}.",
+ tableName,
+ dataSplit.snapshotId());
+ }
}
static class RemoteQueryExecutor implements QueryExecutor {