This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new aac417c1a1 [flink] Fix Flink Lookup Join for Postpone bucket table
(#5537)
aac417c1a1 is described below
commit aac417c1a149bafdc182114fba5ab20770d36c17
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Apr 25 21:28:13 2025 +0800
[flink] Fix Flink Lookup Join for Postpone bucket table (#5537)
---
.../apache/paimon/flink/action/CompactAction.java | 2 +-
.../flink/lookup/FixedBucketFromPkExtractor.java | 110 ------------------
.../flink/lookup/PrimaryKeyPartialLookupTable.java | 100 ++++++++++++++---
.../paimon/flink/source/BaseDataTableSource.java | 6 +-
.../paimon/flink/PostponeBucketTableITCase.java | 125 +++++++++++++++++++++
.../flink/lookup/FileStoreLookupFunctionTest.java | 23 ++++
6 files changed, 241 insertions(+), 125 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 306216a57d..0244a20547 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -259,7 +259,7 @@ public class CompactAction extends TableActionBase {
InternalRowPartitionComputer partitionComputer =
new InternalRowPartitionComputer(
fileStoreTable.coreOptions().partitionDefaultName(),
- fileStoreTable.rowType(),
+ fileStoreTable.store().partitionType(),
fileStoreTable.partitionKeys().toArray(new String[0]),
fileStoreTable.coreOptions().legacyPartitionName());
String commitUser = CoreOptions.createCommitUser(options);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FixedBucketFromPkExtractor.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FixedBucketFromPkExtractor.java
deleted file mode 100644
index 61617988a6..0000000000
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FixedBucketFromPkExtractor.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.lookup;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.codegen.CodeGenUtils;
-import org.apache.paimon.codegen.Projection;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.sink.KeyAndBucketExtractor;
-
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
-/** Extractor to extract bucket from the primary key. */
-public class FixedBucketFromPkExtractor implements
KeyAndBucketExtractor<InternalRow> {
-
- private transient InternalRow primaryKey;
-
- private final boolean sameBucketKeyAndTrimmedPrimaryKey;
-
- private final int numBuckets;
-
- private final Projection bucketKeyProjection;
-
- private final Projection trimmedPrimaryKeyProjection;
-
- private final Projection partitionProjection;
-
- private final Projection logPrimaryKeyProjection;
-
- public FixedBucketFromPkExtractor(TableSchema schema) {
- this.numBuckets = new CoreOptions(schema.options()).bucket();
- checkArgument(numBuckets > 0, "Num bucket is illegal: " + numBuckets);
- this.sameBucketKeyAndTrimmedPrimaryKey =
- schema.bucketKeys().equals(schema.trimmedPrimaryKeys());
- this.bucketKeyProjection =
- CodeGenUtils.newProjection(
- schema.logicalPrimaryKeysType(),
- schema.bucketKeys().stream()
- .mapToInt(schema.primaryKeys()::indexOf)
- .toArray());
- this.trimmedPrimaryKeyProjection =
- CodeGenUtils.newProjection(
- schema.logicalPrimaryKeysType(),
- schema.trimmedPrimaryKeys().stream()
- .mapToInt(schema.primaryKeys()::indexOf)
- .toArray());
- this.partitionProjection =
- CodeGenUtils.newProjection(
- schema.logicalPrimaryKeysType(),
- schema.partitionKeys().stream()
- .mapToInt(schema.primaryKeys()::indexOf)
- .toArray());
- this.logPrimaryKeyProjection =
- CodeGenUtils.newProjection(
- schema.logicalRowType(),
schema.projection(schema.primaryKeys()));
- }
-
- @Override
- public void setRecord(InternalRow record) {
- this.primaryKey = record;
- }
-
- @Override
- public BinaryRow partition() {
- return partitionProjection.apply(primaryKey);
- }
-
- private BinaryRow bucketKey() {
- if (sameBucketKeyAndTrimmedPrimaryKey) {
- return trimmedPrimaryKey();
- }
-
- return bucketKeyProjection.apply(primaryKey);
- }
-
- @Override
- public int bucket() {
- BinaryRow bucketKey = bucketKey();
- return KeyAndBucketExtractor.bucket(
- KeyAndBucketExtractor.bucketKeyHashCode(bucketKey),
numBuckets);
- }
-
- @Override
- public BinaryRow trimmedPrimaryKey() {
- return trimmedPrimaryKeyProjection.apply(primaryKey);
- }
-
- @Override
- public BinaryRow logPrimaryKey() {
- return logPrimaryKeyProjection.apply(primaryKey);
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
index 255351767c..24fd87a213 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
@@ -19,15 +19,19 @@
package org.apache.paimon.flink.lookup;
import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.flink.query.RemoteTableQuery;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.query.LocalTableQuery;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
@@ -43,14 +47,18 @@ import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import static org.apache.paimon.table.BucketMode.POSTPONE_BUCKET;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
/** Lookup table for primary key which supports to read the LSM tree directly.
*/
public class PrimaryKeyPartialLookupTable implements LookupTable {
private final QueryExecutorFactory executorFactory;
- private final FixedBucketFromPkExtractor extractor;
@Nullable private final ProjectedRow keyRearrange;
@Nullable private final ProjectedRow trimmedKeyRearrange;
@@ -58,16 +66,30 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
@Nullable private Filter<InternalRow> cacheRowFilter;
private QueryExecutor queryExecutor;
+ private final Projection partitionFromPk;
+ private final Projection bucketKeyFromPk;
+
private PrimaryKeyPartialLookupTable(
QueryExecutorFactory executorFactory, FileStoreTable table,
List<String> joinKey) {
this.executorFactory = executorFactory;
-
if (table.bucketMode() != BucketMode.HASH_FIXED) {
throw new UnsupportedOperationException(
"Unsupported mode for partial lookup: " +
table.bucketMode());
}
- this.extractor = new FixedBucketFromPkExtractor(table.schema());
+ TableSchema schema = table.schema();
+ this.partitionFromPk =
+ CodeGenUtils.newProjection(
+ schema.logicalPrimaryKeysType(),
+ schema.partitionKeys().stream()
+ .mapToInt(schema.primaryKeys()::indexOf)
+ .toArray());
+ this.bucketKeyFromPk =
+ CodeGenUtils.newProjection(
+ schema.logicalPrimaryKeysType(),
+ schema.bucketKeys().stream()
+ .mapToInt(schema.primaryKeys()::indexOf)
+ .toArray());
ProjectedRow keyRearrange = null;
if (!table.primaryKeys().equals(joinKey)) {
@@ -80,7 +102,7 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
}
this.keyRearrange = keyRearrange;
- List<String> trimmedPrimaryKeys = table.schema().trimmedPrimaryKeys();
+ List<String> trimmedPrimaryKeys = schema.trimmedPrimaryKeys();
ProjectedRow trimmedKeyRearrange = null;
if (!trimmedPrimaryKeys.equals(joinKey)) {
trimmedKeyRearrange =
@@ -115,9 +137,14 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
if (keyRearrange != null) {
adjustedKey = keyRearrange.replaceRow(adjustedKey);
}
- extractor.setRecord(adjustedKey);
- int bucket = extractor.bucket();
- BinaryRow partition = extractor.partition();
+
+ BinaryRow partition = partitionFromPk.apply(adjustedKey);
+ Integer numBuckets = queryExecutor.numBuckets(partition);
+ if (numBuckets == null) {
+ // no data, just return none
+ return Collections.emptyList();
+ }
+ int bucket = bucket(numBuckets, adjustedKey);
InternalRow trimmedKey = key;
if (trimmedKeyRearrange != null) {
@@ -132,6 +159,12 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
}
}
+ private int bucket(int numBuckets, InternalRow primaryKey) {
+ BinaryRow bucketKey = bucketKeyFromPk.apply(primaryKey);
+ return KeyAndBucketExtractor.bucket(
+ KeyAndBucketExtractor.bucketKeyHashCode(bucketKey),
numBuckets);
+ }
+
@Override
public void refresh() {
queryExecutor.refresh();
@@ -182,6 +215,9 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
interface QueryExecutor extends Closeable {
+ @Nullable
+ Integer numBuckets(BinaryRow partition);
+
InternalRow lookup(BinaryRow partition, int bucket, InternalRow key)
throws IOException;
void refresh();
@@ -195,6 +231,9 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
private final StreamTableScan scan;
private final String tableName;
+ private final Integer defaultNumBuckets;
+ private final Map<BinaryRow, Integer> numBuckets;
+
private LocalQueryExecutor(
FileStoreTable table,
int[] projection,
@@ -222,6 +261,14 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
.newStreamScan();
this.tableName = table.name();
+ this.defaultNumBuckets = table.bucketSpec().getNumBuckets();
+ this.numBuckets = new HashMap<>();
+ }
+
+ @Override
+ @Nullable
+ public Integer numBuckets(BinaryRow partition) {
+ return numBuckets.get(partition);
}
@Override
@@ -241,16 +288,30 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
}
for (Split split : splits) {
- BinaryRow partition = ((DataSplit) split).partition();
- int bucket = ((DataSplit) split).bucket();
- List<DataFileMeta> before = ((DataSplit)
split).beforeFiles();
- List<DataFileMeta> after = ((DataSplit) split).dataFiles();
-
- tableQuery.refreshFiles(partition, bucket, before, after);
+ refreshSplit((DataSplit) split);
}
}
}
+ @VisibleForTesting
+ void refreshSplit(DataSplit split) {
+ BinaryRow partition = split.partition();
+ int bucket = split.bucket();
+ List<DataFileMeta> before = split.beforeFiles();
+ List<DataFileMeta> after = split.dataFiles();
+
+ tableQuery.refreshFiles(partition, bucket, before, after);
+ Integer totalBuckets = split.totalBuckets();
+ if (totalBuckets == null) {
+ // Just for compatibility with older versions
+ checkArgument(
+ defaultNumBuckets > 0,
+ "This is a bug, old version table numBuckets should be
greater than 0.");
+ totalBuckets = defaultNumBuckets;
+ }
+ numBuckets.put(partition, totalBuckets);
+ }
+
@Override
public void close() throws IOException {
tableQuery.close();
@@ -273,9 +334,22 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
static class RemoteQueryExecutor implements QueryExecutor {
private final RemoteTableQuery tableQuery;
+ private final Integer numBuckets;
private RemoteQueryExecutor(FileStoreTable table, int[] projection) {
this.tableQuery = new
RemoteTableQuery(table).withValueProjection(projection);
+ int numBuckets = table.bucketSpec().getNumBuckets();
+ if (numBuckets == POSTPONE_BUCKET) {
+ throw new UnsupportedOperationException(
+ "Remote query does not support POSTPONE_BUCKET.");
+ }
+ this.numBuckets = numBuckets;
+ }
+
+ @Override
+ @Nullable
+ public Integer numBuckets(BinaryRow partition) {
+ return numBuckets;
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
index 81cced9a74..ca80129a51 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
@@ -38,6 +38,7 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.BucketSpec;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
@@ -86,6 +87,7 @@ import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGN
import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT;
+import static org.apache.paimon.table.BucketMode.POSTPONE_BUCKET;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
/**
@@ -409,7 +411,9 @@ public abstract class BaseDataTableSource extends
FlinkTableSource
private boolean supportBucketShufflePartitioner(
List<String> joinKeyFieldNames, List<String> bucketKeyFieldNames) {
- return BucketMode.HASH_FIXED.equals(((FileStoreTable)
table).bucketMode())
+ BucketSpec bucketSpec = ((FileStoreTable) table).bucketSpec();
+ return bucketSpec.getBucketMode() == BucketMode.HASH_FIXED
+ && bucketSpec.getNumBuckets() != POSTPONE_BUCKET
&& new
HashSet<>(joinKeyFieldNames).containsAll(bucketKeyFieldNames);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
index 3519f162d0..db87abc7ad 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
@@ -425,6 +425,131 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
.containsExactlyInAnyOrder("+I[5]");
}
+ @Timeout(TIMEOUT)
+ @Test
+ public void testLookupPostponeBucketTable() throws Exception {
+ String warehouse = getTempDirPath();
+ TableEnvironment bEnv =
+ tableEnvironmentBuilder()
+ .batchMode()
+ .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+ .build();
+
+ String createCatalogSql =
+ "CREATE CATALOG mycat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "'\n"
+ + ")";
+ bEnv.executeSql(createCatalogSql);
+ bEnv.executeSql("USE CATALOG mycat");
+ bEnv.executeSql(
+ "CREATE TABLE T (\n"
+ + " k INT,\n"
+ + " v INT,\n"
+ + " PRIMARY KEY (k) NOT ENFORCED\n"
+ + ") WITH (\n"
+ + " 'bucket' = '-2'\n"
+ + ")");
+ bEnv.executeSql("CREATE TABLE SRC (i INT, `proctime` AS PROCTIME())");
+
+ TableEnvironment sEnv =
+ tableEnvironmentBuilder()
+ .streamingMode()
+ .parallelism(1)
+ .checkpointIntervalMs(200)
+ .build();
+ sEnv.executeSql(createCatalogSql);
+ sEnv.executeSql("USE CATALOG mycat");
+ TableResult streamingSelect =
+ sEnv.executeSql(
+ "SELECT i, v FROM SRC LEFT JOIN T "
+ + "FOR SYSTEM_TIME AS OF SRC.proctime AS D ON
SRC.i = D.k");
+
+ JobClient client = streamingSelect.getJobClient().get();
+ CloseableIterator<Row> it = streamingSelect.collect();
+
+ bEnv.executeSql("INSERT INTO T VALUES (1, 10), (2, 20), (3,
30)").await();
+ bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+
+ // lookup join
+ bEnv.executeSql("INSERT INTO SRC VALUES (1), (2), (3)").await();
+ assertThat(collect(client, it, 3))
+ .containsExactlyInAnyOrder("+I[1, 10]", "+I[2, 20]", "+I[3,
30]");
+
+ // rescale and re-join
+ bEnv.executeSql("CALL sys.rescale(`table` => 'default.T', `bucket_num`
=> 5)").await();
+ bEnv.executeSql("INSERT INTO SRC VALUES (1), (2), (3)").await();
+ assertThat(collect(client, it, 3))
+ .containsExactlyInAnyOrder("+I[1, 10]", "+I[2, 20]", "+I[3,
30]");
+
+ it.close();
+ }
+
+ @Timeout(TIMEOUT)
+ @Test
+ public void testLookupPostponeBucketPartitionedTable() throws Exception {
+ String warehouse = getTempDirPath();
+ TableEnvironment bEnv =
+ tableEnvironmentBuilder()
+ .batchMode()
+ .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+ .build();
+
+ String createCatalogSql =
+ "CREATE CATALOG mycat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "'\n"
+ + ")";
+ bEnv.executeSql(createCatalogSql);
+ bEnv.executeSql("USE CATALOG mycat");
+ bEnv.executeSql(
+ "CREATE TABLE T (\n"
+ + " k INT,\n"
+ + " pt INT,\n"
+ + " v INT,\n"
+ + " PRIMARY KEY (k, pt) NOT ENFORCED\n"
+ + ") PARTITIONED BY (pt) WITH (\n"
+ + " 'bucket' = '-2'\n"
+ + ")");
+ bEnv.executeSql("CREATE TABLE SRC (i INT, pt INT, `proctime` AS
PROCTIME())");
+
+ TableEnvironment sEnv =
+ tableEnvironmentBuilder()
+ .streamingMode()
+ .parallelism(1)
+ .checkpointIntervalMs(200)
+ .build();
+ sEnv.executeSql(createCatalogSql);
+ sEnv.executeSql("USE CATALOG mycat");
+ TableResult streamingSelect =
+ sEnv.executeSql(
+ "SELECT i, D.pt, v FROM SRC LEFT JOIN T "
+ + "FOR SYSTEM_TIME AS OF SRC.proctime AS D ON
SRC.i = D.k AND SRC.pt = D.pt");
+
+ JobClient client = streamingSelect.getJobClient().get();
+ CloseableIterator<Row> it = streamingSelect.collect();
+
+ bEnv.executeSql("INSERT INTO T VALUES (1, 1, 10), (2, 2, 20), (3, 2,
30)").await();
+ bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+
+ // rescale for partitions to different num buckets and lookup join
+ bEnv.executeSql(
+ "CALL sys.rescale(`table` => 'default.T', `bucket_num`
=> 5, `partition` => 'pt=1')")
+ .await();
+ bEnv.executeSql(
+ "CALL sys.rescale(`table` => 'default.T', `bucket_num`
=> 8, `partition` => 'pt=2')")
+ .await();
+ bEnv.executeSql("INSERT INTO SRC VALUES (1, 1), (2, 2), (3,
2)").await();
+ assertThat(collect(client, it, 3))
+ .containsExactlyInAnyOrder("+I[1, 1, 10]", "+I[2, 2, 20]",
"+I[3, 2, 30]");
+
+ it.close();
+ }
+
private List<String> collect(TableResult result) throws Exception {
List<String> ret = new ArrayList<>();
try (CloseableIterator<Row> it = result.collect()) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
index dbf9f99a32..dcbc405d31 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
@@ -37,6 +37,7 @@ import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -49,6 +50,7 @@ import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.time.Duration;
@@ -62,6 +64,7 @@ import java.util.List;
import java.util.Random;
import java.util.UUID;
+import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
import static
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST;
import static org.apache.paimon.service.ServiceManager.PRIMARY_KEY_LOOKUP;
import static
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
@@ -148,6 +151,26 @@ public class FileStoreLookupFunctionTest {
}
}
+ @Test
+ public void testCompatibilityForOldVersion() throws Exception {
+ createLookupFunction(false, true, false, false);
+ commit(writeCommit(1));
+ PrimaryKeyPartialLookupTable lookupTable =
+ (PrimaryKeyPartialLookupTable) lookupFunction.lookupTable();
+ LocalQueryExecutor queryExecutor = (LocalQueryExecutor)
lookupTable.queryExecutor();
+
+ // set totalBuckets to null, for testing old version
+ DataSplit split = (DataSplit)
table.newReadBuilder().newScan().plan().splits().get(0);
+ Field field = DataSplit.class.getDeclaredField("totalBuckets");
+ field.setAccessible(true);
+ field.set(split, null);
+ assertThat(split.totalBuckets()).isNull();
+
+ // assert num buckets should be 2
+ queryExecutor.refreshSplit(split);
+ assertThat(queryExecutor.numBuckets(EMPTY_ROW)).isEqualTo(2);
+ }
+
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testDefaultLocalPartial(boolean refreshAsync) throws Exception
{