This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 77eb818744 [flink] fallback to full cache mode when partial-lookup is
unavailable. (#5769)
77eb818744 is described below
commit 77eb8187448446658191a7166ecd095be711dd0c
Author: zhoulii <[email protected]>
AuthorDate: Thu Jun 19 13:12:15 2025 +0800
[flink] fallback to full cache mode when partial-lookup is unavailable.
(#5769)
---
.../apache/paimon/table/query/LocalTableQuery.java | 19 +------------
.../flink/lookup/FileStoreLookupFunction.java | 7 ++---
.../flink/lookup/PrimaryKeyPartialLookupTable.java | 17 +++++++++++
.../org/apache/paimon/flink/LookupJoinITCase.java | 33 ++++++++++++++++++++++
4 files changed, 54 insertions(+), 22 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
index d474d4e2d0..4b89a48464 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
@@ -55,7 +55,6 @@ import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
-import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
import static org.apache.paimon.lookup.LookupStoreFactory.bfGenerator;
import static org.apache.paimon.mergetree.LookupFile.localFilePrefix;
@@ -105,23 +104,7 @@ public class LocalTableQuery implements TableQuery {
options.lookupCacheMaxMemory(),
options.lookupCacheHighPrioPoolRatio()),
new
RowCompactedSerializer(keyType).createSliceComparator());
-
- if (options.needLookup()) {
- startLevel = 1;
- } else {
- if (options.sequenceField().size() > 0) {
- throw new UnsupportedOperationException(
- "Not support sequence field definition, but is: "
- + options.sequenceField());
- }
-
- if (options.mergeEngine() != DEDUPLICATE) {
- throw new UnsupportedOperationException(
- "Only support deduplicate merge engine, but is: " +
options.mergeEngine());
- }
-
- startLevel = 0;
- }
+ startLevel = options.needLookup() ? 1 : 0;
}
public void refreshFiles(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index e3e15c4ce6..8faafa3571 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -30,7 +30,6 @@ import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.OutOfRangeException;
import org.apache.paimon.types.RowType;
@@ -199,11 +198,11 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
table, projection, path, joinKeys,
getRequireCachedBucketIds());
LOG.info(
"Remote service isn't available. Created
PrimaryKeyPartialLookupTable with LocalQueryExecutor.");
- } catch (UnsupportedOperationException ignore) {
+ } catch (UnsupportedOperationException e) {
LOG.info(
"Remote service isn't available. Cannot create
PrimaryKeyPartialLookupTable with LocalQueryExecutor "
- + "because bucket mode isn't {}. Will
create FullCacheLookupTable.",
- BucketMode.HASH_FIXED);
+ + "because {}. Will create
FullCacheLookupTable.",
+ e.getMessage());
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
index 43aeefc26a..c6da347893 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.lookup;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.Projection;
@@ -78,6 +79,22 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
"Unsupported mode for partial lookup: " + bucketMode);
}
+ CoreOptions coreOptions = CoreOptions.fromMap(table.options());
+
+ if (!coreOptions.needLookup()
+ && coreOptions.mergeEngine() !=
CoreOptions.MergeEngine.DEDUPLICATE) {
+ throw new UnsupportedOperationException(
+ "Only support deduplicate merge engine when table does not
need lookup, but merge engine is: "
+ + coreOptions.mergeEngine());
+ }
+
+ if (coreOptions.mergeEngine() == CoreOptions.MergeEngine.DEDUPLICATE
+ && !coreOptions.sequenceField().isEmpty()) {
+ throw new UnsupportedOperationException(
+ "Unsupported sequence fields definition for partial lookup
when use deduplicate merge engine, but sequence fields are: "
+ + coreOptions.sequenceField());
+ }
+
TableSchema schema = table.schema();
this.partitionFromPk =
CodeGenUtils.newProjection(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index 652ef5bd9c..2744650a8e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -1108,4 +1108,37 @@ public class LookupJoinITCase extends CatalogITCaseBase {
assertThat(result)
.containsExactlyInAnyOrder(Row.of(1, 211), Row.of(2, 212),
Row.of(3, 213));
}
+
+ @Test
+ public void testFallbackCacheMode() throws Exception {
+ sql(
+ "CREATE TABLE DIM_WITH_SEQUENCE (i INT PRIMARY KEY NOT
ENFORCED, j INT, k1 INT, k2 INT) WITH"
+ + " ('continuous.discovery-interval'='1 ms',
'sequence.field' = 'j', 'bucket' = '1')");
+ sql("INSERT INTO DIM_WITH_SEQUENCE VALUES (1, 11, 111, 1111), (2, 22,
222, 2222)");
+
+ String query =
+ "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN
DIM_WITH_SEQUENCE for system_time as of T.proctime AS D ON T.i = D.i";
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql("INSERT INTO T VALUES (1), (2), (3)");
+ List<Row> result = iterator.collect(3);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of(1, 11, 111, 1111),
+ Row.of(2, 22, 222, 2222),
+ Row.of(3, null, null, null));
+
+ sql("INSERT INTO DIM_WITH_SEQUENCE VALUES (2, 11, 444, 4444), (3, 33,
333, 3333)");
+ Thread.sleep(2000); // wait refresh
+ sql("INSERT INTO T VALUES (1), (2), (3), (4)");
+ result = iterator.collect(4);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of(1, 11, 111, 1111),
+ Row.of(2, 22, 222, 2222), // not change
+ Row.of(3, 33, 333, 3333),
+ Row.of(4, null, null, null));
+
+ iterator.close();
+ }
}