This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fc2ee7cba5 [flink] Don't reopen the dim table if the scanned
partitions are not overwritten (#6596)
fc2ee7cba5 is described below
commit fc2ee7cba515b960cb8e7f0cf3627d12a18d75d0
Author: yuzelin <[email protected]>
AuthorDate: Tue Nov 25 15:12:07 2025 +0800
[flink] Don't reopen the dim table if the scanned partitions are not
overwritten (#6596)
---
.../flink/lookup/FileStoreLookupFunction.java | 6 +-
.../paimon/flink/lookup/FullCacheLookupTable.java | 14 ++--
.../paimon/flink/lookup/LookupDataTableScan.java | 74 ++++++++++++++------
.../paimon/flink/lookup/LookupFileStoreTable.java | 11 +--
.../paimon/flink/lookup/LookupStreamingReader.java | 10 +--
.../apache/paimon/flink/lookup/LookupTable.java | 5 +-
.../flink/lookup/PrimaryKeyPartialLookupTable.java | 9 +--
.../org/apache/paimon/flink/LookupJoinITCase.java | 32 +++++++--
.../paimon/flink/lookup/LookupTableTest.java | 78 ++++++++++++++++++++--
9 files changed, 182 insertions(+), 57 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 829209dbb6..7df4742101 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -226,7 +226,8 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
partitionLoader.checkRefresh();
List<BinaryRow> partitions = partitionLoader.partitions();
if (!partitions.isEmpty()) {
-
lookupTable.specificPartitionFilter(partitionLoader.createSpecificPartFilter());
+ lookupTable.specifyPartitions(
+ partitions,
partitionLoader.createSpecificPartFilter());
}
}
@@ -327,7 +328,8 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
if (partitionChanged) {
// reopen with latest partition
-
lookupTable.specificPartitionFilter(partitionLoader.createSpecificPartFilter());
+ lookupTable.specifyPartitions(
+ partitionLoader.partitions(),
partitionLoader.createSpecificPartFilter());
lookupTable.close();
lookupTable.open();
// no need to refresh the lookup table because it is reopened
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index bbae15d07c..cda824cc94 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -92,7 +92,8 @@ public abstract class FullCacheLookupTable implements
LookupTable {
private final FileStoreTable table;
private Future<?> refreshFuture;
private LookupStreamingReader reader;
- private Predicate specificPartition;
+ @Nullable private List<BinaryRow> scanPartitions;
+ @Nullable private Predicate partitionFilter;
@Nullable private Filter<InternalRow> cacheRowFilter;
public FullCacheLookupTable(Context context) {
@@ -139,8 +140,10 @@ public abstract class FullCacheLookupTable implements
LookupTable {
}
@Override
- public void specificPartitionFilter(Predicate filter) {
- this.specificPartition = filter;
+ public void specifyPartitions(
+ List<BinaryRow> scanPartitions, @Nullable Predicate
partitionFilter) {
+ this.scanPartitions = scanPartitions;
+ this.partitionFilter = partitionFilter;
}
@Override
@@ -172,14 +175,15 @@ public abstract class FullCacheLookupTable implements
LookupTable {
protected void bootstrap() throws Exception {
Predicate scanPredicate =
- PredicateBuilder.andNullable(context.tablePredicate,
specificPartition);
+ PredicateBuilder.andNullable(context.tablePredicate,
partitionFilter);
this.reader =
new LookupStreamingReader(
context.table,
context.projection,
scanPredicate,
context.requiredCachedBucketIds,
- cacheRowFilter);
+ cacheRowFilter,
+ scanPartitions);
if (!stateFactory.preferBulkLoad()) {
doRefresh();
return;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
index c1fd45f757..b64e7edace 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
@@ -20,24 +20,27 @@ package org.apache.paimon.flink.lookup;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataTableStreamScan;
-import org.apache.paimon.table.source.TableQueryAuth;
import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner;
import org.apache.paimon.table.source.snapshot.BoundedChecker;
import org.apache.paimon.table.source.snapshot.FollowUpScanner;
import org.apache.paimon.table.source.snapshot.FullStartingScanner;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
-import org.apache.paimon.utils.ChangelogManager;
-import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.SimpleFileReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.util.List;
+
import static org.apache.paimon.CoreOptions.StartupMode;
import static
org.apache.paimon.flink.lookup.LookupFileStoreTable.LookupStreamScanMode;
@@ -49,29 +52,28 @@ public class LookupDataTableScan extends
DataTableStreamScan {
private static final Logger LOG =
LoggerFactory.getLogger(LookupDataTableScan.class);
+ private final FileStoreTable table;
private final StartupMode startupMode;
private final LookupStreamScanMode lookupScanMode;
+ @Nullable private List<BinaryRow> scanPartitions = null;
+
public LookupDataTableScan(
- TableSchema schema,
- CoreOptions options,
+ FileStoreTable table,
SnapshotReader snapshotReader,
- SnapshotManager snapshotManager,
- ChangelogManager changelogManager,
- boolean supportStreamingReadOverwrite,
- LookupStreamScanMode lookupScanMode,
- TableQueryAuth queryAuth,
- boolean hasPk) {
+ LookupStreamScanMode lookupScanMode) {
super(
- schema,
- options,
+ table.schema(),
+ table.coreOptions(),
snapshotReader,
- snapshotManager,
- changelogManager,
- supportStreamingReadOverwrite,
- queryAuth,
- hasPk);
+ table.snapshotManager(),
+ table.changelogManager(),
+ table.supportStreamingReadOverwrite(),
+ table.catalogEnvironment().tableQueryAuth(table.coreOptions()),
+ !table.schema().primaryKeys().isEmpty());
+ CoreOptions options = table.coreOptions();
+ this.table = table;
this.startupMode = options.startupMode();
this.lookupScanMode = lookupScanMode;
dropStats();
@@ -81,6 +83,10 @@ public class LookupDataTableScan extends DataTableStreamScan
{
}
}
+ public void setScanPartitions(List<BinaryRow> scanPartitions) {
+ this.scanPartitions = scanPartitions;
+ }
+
@Override
@Nullable
protected SnapshotReader.Plan handleOverwriteSnapshot(Snapshot snapshot) {
@@ -88,8 +94,34 @@ public class LookupDataTableScan extends DataTableStreamScan
{
if (plan != null) {
return plan;
}
- LOG.info("Dim table found OVERWRITE snapshot {}, reopen.",
snapshot.id());
- throw new ReopenException();
+
+ if (shouldReopen(snapshot)) {
+ LOG.info("Dim table found OVERWRITE snapshot {}, reopen.",
snapshot.id());
+ throw new ReopenException();
+ } else {
+ return null;
+ }
+ }
+
+ private boolean shouldReopen(Snapshot snapshot) {
+ if (scanPartitions == null) {
+ return true;
+ }
+
+ List<ManifestFileMeta> manifests =
+ table.manifestListReader().read(snapshot.deltaManifestList());
+ SimpleFileReader<ManifestEntry> manifestReader =
table.manifestFileReader();
+ for (ManifestFileMeta manifest : manifests) {
+ List<ManifestEntry> entries =
manifestReader.read(manifest.fileName());
+ for (ManifestEntry entry : entries) {
+ BinaryRow partition = entry.partition();
+ if (scanPartitions.contains(partition)) {
+ return true;
+ }
+ }
+ }
+
+ return false;
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java
index 3c2fcb6a83..353c99d2b1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java
@@ -73,16 +73,7 @@ public class LookupFileStoreTable extends
DelegatedFileStoreTable {
@Override
public StreamDataTableScan newStreamScan() {
- return new LookupDataTableScan(
- wrapped.schema(),
- wrapped.coreOptions(),
- wrapped.newSnapshotReader(),
- wrapped.snapshotManager(),
- wrapped.changelogManager(),
- wrapped.supportStreamingReadOverwrite(),
- lookupScanMode,
-
wrapped.catalogEnvironment().tableQueryAuth(wrapped.coreOptions()),
- !wrapped.schema().primaryKeys().isEmpty());
+ return new LookupDataTableScan(wrapped, wrapped.newSnapshotReader(),
lookupScanMode);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
index adc3e19b52..ac0f00d895 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.lookup;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.SplitsParallelReadUtil;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
@@ -29,7 +30,6 @@ import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.FunctionWithIOException;
@@ -61,14 +61,15 @@ public class LookupStreamingReader {
@Nullable private final Filter<InternalRow> cacheRowFilter;
private final ReadBuilder readBuilder;
@Nullable private final Predicate projectedPredicate;
- private final StreamTableScan scan;
+ private final LookupDataTableScan scan;
public LookupStreamingReader(
LookupFileStoreTable table,
int[] projection,
@Nullable Predicate predicate,
Set<Integer> requireCachedBucketIds,
- @Nullable Filter<InternalRow> cacheRowFilter) {
+ @Nullable Filter<InternalRow> cacheRowFilter,
+ @Nullable List<BinaryRow> scanPartitions) {
this.table = table;
this.projection = projection;
this.cacheRowFilter = cacheRowFilter;
@@ -81,7 +82,8 @@ public class LookupStreamingReader {
requireCachedBucketIds == null
? null
: requireCachedBucketIds::contains);
- scan = readBuilder.newStreamScan();
+ scan = (LookupDataTableScan) readBuilder.newStreamScan();
+ scan.setScanPartitions(scanPartitions);
if (predicate != null) {
List<String> fieldNames = table.rowType().getFieldNames();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
index 8ea1931b96..f8af411941 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
@@ -18,10 +18,13 @@
package org.apache.paimon.flink.lookup;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.utils.Filter;
+import javax.annotation.Nullable;
+
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
@@ -29,7 +32,7 @@ import java.util.List;
/** A lookup table which provides get and refresh. */
public interface LookupTable extends Closeable {
- void specificPartitionFilter(Predicate filter);
+ void specifyPartitions(List<BinaryRow> scanPartitions, @Nullable Predicate
partitionFilter);
void open() throws Exception;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
index 886fdc8795..ded0f29da7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
@@ -63,7 +63,7 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
@Nullable private final ProjectedRow keyRearrange;
@Nullable private final ProjectedRow trimmedKeyRearrange;
- private Predicate specificPartition;
+ @Nullable private Predicate partitionFilter;
@Nullable private Filter<InternalRow> cacheRowFilter;
private QueryExecutor queryExecutor;
@@ -143,13 +143,14 @@ public class PrimaryKeyPartialLookupTable implements
LookupTable {
}
@Override
- public void specificPartitionFilter(Predicate filter) {
- this.specificPartition = filter;
+ public void specifyPartitions(
+ List<BinaryRow> scanPartitions, @Nullable Predicate
partitionFilter) {
+ this.partitionFilter = partitionFilter;
}
@Override
public void open() throws Exception {
- this.queryExecutor = executorFactory.create(specificPartition,
cacheRowFilter);
+ this.queryExecutor = executorFactory.create(partitionFilter,
cacheRowFilter);
refresh();
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index dfa6907c91..408d74ea0c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -1163,16 +1163,18 @@ public class LookupJoinITCase extends CatalogITCaseBase
{
iterator.close();
}
- @Test
- public void testMaxPtAndOverwrite() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testMaxPtAndOverwrite(boolean refreshPartitionQuickly) throws
Exception {
sql(
"CREATE TABLE PARTITIONED_DIM (pt INT, k INT, v INT) "
+ "PARTITIONED BY (`pt`) WITH ("
+ "'bucket' = '2', "
+ "'bucket-key' = 'k', "
+ "'lookup.dynamic-partition' = 'max_pt()', "
- + "'lookup.dynamic-partition.refresh-interval' =
'99999 s', "
- + "'continuous.discovery-interval'='1 ms')");
+ + "'lookup.dynamic-partition.refresh-interval' = '%s',
"
+ + "'continuous.discovery-interval'='1 ms')",
+ refreshPartitionQuickly ? "10 ms" : "99999 s");
sql(
"INSERT INTO PARTITIONED_DIM VALUES (1, 1, 101), (1, 2, 102),
(2, 1, 201), (2, 2, 202)");
@@ -1192,6 +1194,28 @@ public class LookupJoinITCase extends CatalogITCaseBase {
result = iterator.collect(3);
assertThat(result)
.containsExactlyInAnyOrder(Row.of(1, 211), Row.of(2, 212),
Row.of(3, 213));
+
+ // overwrite old partition
+ sql(
+ "INSERT OVERWRITE PARTITIONED_DIM PARTITION (pt = 1) VALUES
(1, 111), (2, 112), (3, 113)");
+ sql("INSERT INTO T VALUES (1), (2), (3)");
+ result = iterator.collect(3);
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of(1, 211), Row.of(2, 212),
Row.of(3, 213));
+
+ // overwrite new MAX partition
+ sql(
+ "INSERT OVERWRITE PARTITIONED_DIM PARTITION (pt = 3) VALUES
(1, 301), (2, 302), (3, 303)");
+ sql("INSERT INTO T VALUES (1), (2), (3)");
+ result = iterator.collect(3);
+ if (refreshPartitionQuickly) {
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of(1, 301), Row.of(2, 302),
Row.of(3, 303));
+ } else {
+ // MAX_PT isn't changed
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of(1, 211), Row.of(2, 212),
Row.of(3, 213));
+ }
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
index 4a3ca45c3b..ed172b5e58 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
@@ -78,6 +78,7 @@ import static
org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode.FULL
import static
org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode.MEMORY;
import static org.apache.paimon.types.DataTypes.INT;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link LookupTable}. */
@@ -115,17 +116,18 @@ public class LookupTableTest extends TableTestBase {
}
private FileStoreTable createTable(List<String> primaryKeys, Options
options) throws Exception {
+ return createTable(emptyList(), primaryKeys, options);
+ }
+
+ private FileStoreTable createTable(
+ List<String> partitionKeys, List<String> primaryKeys, Options
options)
+ throws Exception {
if (inMemory) {
options.set(LOOKUP_CACHE_MODE, MEMORY);
}
Identifier identifier = new Identifier("default", "t");
Schema schema =
- new Schema(
- rowType.getFields(),
- Collections.emptyList(),
- primaryKeys,
- options.toMap(),
- null);
+ new Schema(rowType.getFields(), partitionKeys, primaryKeys,
options.toMap(), null);
catalog.createTable(identifier, schema, false);
return (FileStoreTable) catalog.getTable(identifier);
}
@@ -1028,6 +1030,70 @@ public class LookupTableTest extends TableTestBase {
assertRow(result.get(0), 1, -1, 111);
}
+ @TestTemplate
+ public void testLookupDataTableScanHandleOverwrite() throws Exception {
+ Options options = new Options();
+ options.set(CoreOptions.BUCKET, 1);
+ FileStoreTable storeTable =
+ createTable(singletonList("f0"), Arrays.asList("f0", "f1"),
options);
+ LookupFileStoreTable lookupFileStoreTable =
+ new LookupFileStoreTable(storeTable, Arrays.asList("f0",
"f1"));
+
+ LookupDataTableScan scan = (LookupDataTableScan)
lookupFileStoreTable.newStreamScan();
+
+ write(storeTable, (IOManager) null, GenericRow.of(1, 1, 1),
GenericRow.of(2, 1, 1));
+
+ BatchWriteBuilder writeBuilder =
+ storeTable
+ .newBatchWriteBuilder()
+ .withOverwrite(Collections.singletonMap("f0", "1"));
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(GenericRow.of(1, 2, 2));
+ commit.commit(write.prepareCommit());
+ }
+
+ // didn't specify partitions
+ assertThatThrownBy(
+ () ->
+ scan.handleOverwriteSnapshot(
+
storeTable.snapshotManager().latestSnapshot()))
+ .isInstanceOf(ReopenException.class);
+
+ // specify partition f0 = 1
+
scan.setScanPartitions(Collections.singletonList(BinaryRow.singleColumn(1)));
+ assertThatThrownBy(
+ () ->
+ scan.handleOverwriteSnapshot(
+
storeTable.snapshotManager().latestSnapshot()))
+ .isInstanceOf(ReopenException.class);
+
+ // specify partition f0 = 2
+
scan.setScanPartitions(Collections.singletonList(BinaryRow.singleColumn(2)));
+ assertThatCode(
+ () ->
+ scan.handleOverwriteSnapshot(
+
storeTable.snapshotManager().latestSnapshot()))
+ .doesNotThrowAnyException();
+
+ // overwrite partition f0 = 3 and reopen
+
scan.setScanPartitions(Collections.singletonList(BinaryRow.singleColumn(3)));
+ writeBuilder =
+ storeTable
+ .newBatchWriteBuilder()
+ .withOverwrite(Collections.singletonMap("f0", "3"));
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(GenericRow.of(3, 1, 1));
+ commit.commit(write.prepareCommit());
+ }
+ assertThatThrownBy(
+ () ->
+ scan.handleOverwriteSnapshot(
+
storeTable.snapshotManager().latestSnapshot()))
+ .isInstanceOf(ReopenException.class);
+ }
+
private FileStoreTable createDimTable() throws Exception {
FileIO fileIO = LocalFileIO.create();
org.apache.paimon.fs.Path tablePath =