This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new fc2ee7cba5 [flink] Don't reopen the dim table if the scanned 
partitions are not overwritten (#6596)
fc2ee7cba5 is described below

commit fc2ee7cba515b960cb8e7f0cf3627d12a18d75d0
Author: yuzelin <[email protected]>
AuthorDate: Tue Nov 25 15:12:07 2025 +0800

    [flink] Don't reopen the dim table if the scanned partitions are not 
overwritten (#6596)
---
 .../flink/lookup/FileStoreLookupFunction.java      |  6 +-
 .../paimon/flink/lookup/FullCacheLookupTable.java  | 14 ++--
 .../paimon/flink/lookup/LookupDataTableScan.java   | 74 ++++++++++++++------
 .../paimon/flink/lookup/LookupFileStoreTable.java  | 11 +--
 .../paimon/flink/lookup/LookupStreamingReader.java | 10 +--
 .../apache/paimon/flink/lookup/LookupTable.java    |  5 +-
 .../flink/lookup/PrimaryKeyPartialLookupTable.java |  9 +--
 .../org/apache/paimon/flink/LookupJoinITCase.java  | 32 +++++++--
 .../paimon/flink/lookup/LookupTableTest.java       | 78 ++++++++++++++++++++--
 9 files changed, 182 insertions(+), 57 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 829209dbb6..7df4742101 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -226,7 +226,8 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
             partitionLoader.checkRefresh();
             List<BinaryRow> partitions = partitionLoader.partitions();
             if (!partitions.isEmpty()) {
-                
lookupTable.specificPartitionFilter(partitionLoader.createSpecificPartFilter());
+                lookupTable.specifyPartitions(
+                        partitions, 
partitionLoader.createSpecificPartFilter());
             }
         }
 
@@ -327,7 +328,8 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
 
             if (partitionChanged) {
                 // reopen with latest partition
-                
lookupTable.specificPartitionFilter(partitionLoader.createSpecificPartFilter());
+                lookupTable.specifyPartitions(
+                        partitionLoader.partitions(), 
partitionLoader.createSpecificPartFilter());
                 lookupTable.close();
                 lookupTable.open();
                 // no need to refresh the lookup table because it is reopened
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index bbae15d07c..cda824cc94 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -92,7 +92,8 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
     private final FileStoreTable table;
     private Future<?> refreshFuture;
     private LookupStreamingReader reader;
-    private Predicate specificPartition;
+    @Nullable private List<BinaryRow> scanPartitions;
+    @Nullable private Predicate partitionFilter;
     @Nullable private Filter<InternalRow> cacheRowFilter;
 
     public FullCacheLookupTable(Context context) {
@@ -139,8 +140,10 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
     }
 
     @Override
-    public void specificPartitionFilter(Predicate filter) {
-        this.specificPartition = filter;
+    public void specifyPartitions(
+            List<BinaryRow> scanPartitions, @Nullable Predicate 
partitionFilter) {
+        this.scanPartitions = scanPartitions;
+        this.partitionFilter = partitionFilter;
     }
 
     @Override
@@ -172,14 +175,15 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
 
     protected void bootstrap() throws Exception {
         Predicate scanPredicate =
-                PredicateBuilder.andNullable(context.tablePredicate, 
specificPartition);
+                PredicateBuilder.andNullable(context.tablePredicate, 
partitionFilter);
         this.reader =
                 new LookupStreamingReader(
                         context.table,
                         context.projection,
                         scanPredicate,
                         context.requiredCachedBucketIds,
-                        cacheRowFilter);
+                        cacheRowFilter,
+                        scanPartitions);
         if (!stateFactory.preferBulkLoad()) {
             doRefresh();
             return;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
index c1fd45f757..b64e7edace 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
@@ -20,24 +20,27 @@ package org.apache.paimon.flink.lookup;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.DataTableStreamScan;
-import org.apache.paimon.table.source.TableQueryAuth;
 import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner;
 import org.apache.paimon.table.source.snapshot.BoundedChecker;
 import org.apache.paimon.table.source.snapshot.FollowUpScanner;
 import org.apache.paimon.table.source.snapshot.FullStartingScanner;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
-import org.apache.paimon.utils.ChangelogManager;
-import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.SimpleFileReader;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.List;
+
 import static org.apache.paimon.CoreOptions.StartupMode;
 import static 
org.apache.paimon.flink.lookup.LookupFileStoreTable.LookupStreamScanMode;
 
@@ -49,29 +52,28 @@ public class LookupDataTableScan extends 
DataTableStreamScan {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(LookupDataTableScan.class);
 
+    private final FileStoreTable table;
     private final StartupMode startupMode;
     private final LookupStreamScanMode lookupScanMode;
 
+    @Nullable private List<BinaryRow> scanPartitions = null;
+
     public LookupDataTableScan(
-            TableSchema schema,
-            CoreOptions options,
+            FileStoreTable table,
             SnapshotReader snapshotReader,
-            SnapshotManager snapshotManager,
-            ChangelogManager changelogManager,
-            boolean supportStreamingReadOverwrite,
-            LookupStreamScanMode lookupScanMode,
-            TableQueryAuth queryAuth,
-            boolean hasPk) {
+            LookupStreamScanMode lookupScanMode) {
         super(
-                schema,
-                options,
+                table.schema(),
+                table.coreOptions(),
                 snapshotReader,
-                snapshotManager,
-                changelogManager,
-                supportStreamingReadOverwrite,
-                queryAuth,
-                hasPk);
+                table.snapshotManager(),
+                table.changelogManager(),
+                table.supportStreamingReadOverwrite(),
+                table.catalogEnvironment().tableQueryAuth(table.coreOptions()),
+                !table.schema().primaryKeys().isEmpty());
+        CoreOptions options = table.coreOptions();
 
+        this.table = table;
         this.startupMode = options.startupMode();
         this.lookupScanMode = lookupScanMode;
         dropStats();
@@ -81,6 +83,10 @@ public class LookupDataTableScan extends DataTableStreamScan 
{
         }
     }
 
+    public void setScanPartitions(List<BinaryRow> scanPartitions) {
+        this.scanPartitions = scanPartitions;
+    }
+
     @Override
     @Nullable
     protected SnapshotReader.Plan handleOverwriteSnapshot(Snapshot snapshot) {
@@ -88,8 +94,34 @@ public class LookupDataTableScan extends DataTableStreamScan 
{
         if (plan != null) {
             return plan;
         }
-        LOG.info("Dim table found OVERWRITE snapshot {}, reopen.", 
snapshot.id());
-        throw new ReopenException();
+
+        if (shouldReopen(snapshot)) {
+            LOG.info("Dim table found OVERWRITE snapshot {}, reopen.", 
snapshot.id());
+            throw new ReopenException();
+        } else {
+            return null;
+        }
+    }
+
+    private boolean shouldReopen(Snapshot snapshot) {
+        if (scanPartitions == null) {
+            return true;
+        }
+
+        List<ManifestFileMeta> manifests =
+                table.manifestListReader().read(snapshot.deltaManifestList());
+        SimpleFileReader<ManifestEntry> manifestReader = 
table.manifestFileReader();
+        for (ManifestFileMeta manifest : manifests) {
+            List<ManifestEntry> entries = 
manifestReader.read(manifest.fileName());
+            for (ManifestEntry entry : entries) {
+                BinaryRow partition = entry.partition();
+                if (scanPartitions.contains(partition)) {
+                    return true;
+                }
+            }
+        }
+
+        return false;
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java
index 3c2fcb6a83..353c99d2b1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java
@@ -73,16 +73,7 @@ public class LookupFileStoreTable extends 
DelegatedFileStoreTable {
 
     @Override
     public StreamDataTableScan newStreamScan() {
-        return new LookupDataTableScan(
-                wrapped.schema(),
-                wrapped.coreOptions(),
-                wrapped.newSnapshotReader(),
-                wrapped.snapshotManager(),
-                wrapped.changelogManager(),
-                wrapped.supportStreamingReadOverwrite(),
-                lookupScanMode,
-                
wrapped.catalogEnvironment().tableQueryAuth(wrapped.coreOptions()),
-                !wrapped.schema().primaryKeys().isEmpty());
+        return new LookupDataTableScan(wrapped, wrapped.newSnapshotReader(), 
lookupScanMode);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
index adc3e19b52..ac0f00d895 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.lookup;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.SplitsParallelReadUtil;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
@@ -29,7 +30,6 @@ import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.FunctionWithIOException;
@@ -61,14 +61,15 @@ public class LookupStreamingReader {
     @Nullable private final Filter<InternalRow> cacheRowFilter;
     private final ReadBuilder readBuilder;
     @Nullable private final Predicate projectedPredicate;
-    private final StreamTableScan scan;
+    private final LookupDataTableScan scan;
 
     public LookupStreamingReader(
             LookupFileStoreTable table,
             int[] projection,
             @Nullable Predicate predicate,
             Set<Integer> requireCachedBucketIds,
-            @Nullable Filter<InternalRow> cacheRowFilter) {
+            @Nullable Filter<InternalRow> cacheRowFilter,
+            @Nullable List<BinaryRow> scanPartitions) {
         this.table = table;
         this.projection = projection;
         this.cacheRowFilter = cacheRowFilter;
@@ -81,7 +82,8 @@ public class LookupStreamingReader {
                                 requireCachedBucketIds == null
                                         ? null
                                         : requireCachedBucketIds::contains);
-        scan = readBuilder.newStreamScan();
+        scan = (LookupDataTableScan) readBuilder.newStreamScan();
+        scan.setScanPartitions(scanPartitions);
 
         if (predicate != null) {
             List<String> fieldNames = table.rowType().getFieldNames();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
index 8ea1931b96..f8af411941 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java
@@ -18,10 +18,13 @@
 
 package org.apache.paimon.flink.lookup;
 
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.utils.Filter;
 
+import javax.annotation.Nullable;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
@@ -29,7 +32,7 @@ import java.util.List;
 /** A lookup table which provides get and refresh. */
 public interface LookupTable extends Closeable {
 
-    void specificPartitionFilter(Predicate filter);
+    void specifyPartitions(List<BinaryRow> scanPartitions, @Nullable Predicate 
partitionFilter);
 
     void open() throws Exception;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
index 886fdc8795..ded0f29da7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
@@ -63,7 +63,7 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
     @Nullable private final ProjectedRow keyRearrange;
     @Nullable private final ProjectedRow trimmedKeyRearrange;
 
-    private Predicate specificPartition;
+    @Nullable private Predicate partitionFilter;
     @Nullable private Filter<InternalRow> cacheRowFilter;
     private QueryExecutor queryExecutor;
 
@@ -143,13 +143,14 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
     }
 
     @Override
-    public void specificPartitionFilter(Predicate filter) {
-        this.specificPartition = filter;
+    public void specifyPartitions(
+            List<BinaryRow> scanPartitions, @Nullable Predicate 
partitionFilter) {
+        this.partitionFilter = partitionFilter;
     }
 
     @Override
     public void open() throws Exception {
-        this.queryExecutor = executorFactory.create(specificPartition, 
cacheRowFilter);
+        this.queryExecutor = executorFactory.create(partitionFilter, 
cacheRowFilter);
         refresh();
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index dfa6907c91..408d74ea0c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -1163,16 +1163,18 @@ public class LookupJoinITCase extends CatalogITCaseBase 
{
         iterator.close();
     }
 
-    @Test
-    public void testMaxPtAndOverwrite() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testMaxPtAndOverwrite(boolean refreshPartitionQuickly) throws 
Exception {
         sql(
                 "CREATE TABLE PARTITIONED_DIM (pt INT, k INT, v INT) "
                         + "PARTITIONED BY (`pt`) WITH ("
                         + "'bucket' = '2', "
                         + "'bucket-key' = 'k', "
                         + "'lookup.dynamic-partition' = 'max_pt()', "
-                        + "'lookup.dynamic-partition.refresh-interval' = 
'99999 s', "
-                        + "'continuous.discovery-interval'='1 ms')");
+                        + "'lookup.dynamic-partition.refresh-interval' = '%s', 
"
+                        + "'continuous.discovery-interval'='1 ms')",
+                refreshPartitionQuickly ? "10 ms" : "99999 s");
         sql(
                 "INSERT INTO PARTITIONED_DIM VALUES (1, 1, 101), (1, 2, 102), 
(2, 1, 201), (2, 2, 202)");
 
@@ -1192,6 +1194,28 @@ public class LookupJoinITCase extends CatalogITCaseBase {
         result = iterator.collect(3);
         assertThat(result)
                 .containsExactlyInAnyOrder(Row.of(1, 211), Row.of(2, 212), 
Row.of(3, 213));
+
+        // overwrite old partition
+        sql(
+                "INSERT OVERWRITE PARTITIONED_DIM PARTITION (pt = 1) VALUES 
(1, 111), (2, 112), (3, 113)");
+        sql("INSERT INTO T VALUES (1), (2), (3)");
+        result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(Row.of(1, 211), Row.of(2, 212), 
Row.of(3, 213));
+
+        // overwrite new MAX partition
+        sql(
+                "INSERT OVERWRITE PARTITIONED_DIM PARTITION (pt = 3) VALUES 
(1, 301), (2, 302), (3, 303)");
+        sql("INSERT INTO T VALUES (1), (2), (3)");
+        result = iterator.collect(3);
+        if (refreshPartitionQuickly) {
+            assertThat(result)
+                    .containsExactlyInAnyOrder(Row.of(1, 301), Row.of(2, 302), 
Row.of(3, 303));
+        } else {
+            // MAX_PT isn't changed
+            assertThat(result)
+                    .containsExactlyInAnyOrder(Row.of(1, 211), Row.of(2, 212), 
Row.of(3, 213));
+        }
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
index 4a3ca45c3b..ed172b5e58 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
@@ -78,6 +78,7 @@ import static 
org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode.FULL
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode.MEMORY;
 import static org.apache.paimon.types.DataTypes.INT;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link LookupTable}. */
@@ -115,17 +116,18 @@ public class LookupTableTest extends TableTestBase {
     }
 
     private FileStoreTable createTable(List<String> primaryKeys, Options 
options) throws Exception {
+        return createTable(emptyList(), primaryKeys, options);
+    }
+
+    private FileStoreTable createTable(
+            List<String> partitionKeys, List<String> primaryKeys, Options 
options)
+            throws Exception {
         if (inMemory) {
             options.set(LOOKUP_CACHE_MODE, MEMORY);
         }
         Identifier identifier = new Identifier("default", "t");
         Schema schema =
-                new Schema(
-                        rowType.getFields(),
-                        Collections.emptyList(),
-                        primaryKeys,
-                        options.toMap(),
-                        null);
+                new Schema(rowType.getFields(), partitionKeys, primaryKeys, 
options.toMap(), null);
         catalog.createTable(identifier, schema, false);
         return (FileStoreTable) catalog.getTable(identifier);
     }
@@ -1028,6 +1030,70 @@ public class LookupTableTest extends TableTestBase {
         assertRow(result.get(0), 1, -1, 111);
     }
 
+    @TestTemplate
+    public void testLookupDataTableScanHandleOverwrite() throws Exception {
+        Options options = new Options();
+        options.set(CoreOptions.BUCKET, 1);
+        FileStoreTable storeTable =
+                createTable(singletonList("f0"), Arrays.asList("f0", "f1"), 
options);
+        LookupFileStoreTable lookupFileStoreTable =
+                new LookupFileStoreTable(storeTable, Arrays.asList("f0", 
"f1"));
+
+        LookupDataTableScan scan = (LookupDataTableScan) 
lookupFileStoreTable.newStreamScan();
+
+        write(storeTable, (IOManager) null, GenericRow.of(1, 1, 1), 
GenericRow.of(2, 1, 1));
+
+        BatchWriteBuilder writeBuilder =
+                storeTable
+                        .newBatchWriteBuilder()
+                        .withOverwrite(Collections.singletonMap("f0", "1"));
+        try (BatchTableWrite write = writeBuilder.newWrite();
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            write.write(GenericRow.of(1, 2, 2));
+            commit.commit(write.prepareCommit());
+        }
+
+        // didn't specify partitions
+        assertThatThrownBy(
+                        () ->
+                                scan.handleOverwriteSnapshot(
+                                        
storeTable.snapshotManager().latestSnapshot()))
+                .isInstanceOf(ReopenException.class);
+
+        // specify partition f0 = 1
+        
scan.setScanPartitions(Collections.singletonList(BinaryRow.singleColumn(1)));
+        assertThatThrownBy(
+                        () ->
+                                scan.handleOverwriteSnapshot(
+                                        
storeTable.snapshotManager().latestSnapshot()))
+                .isInstanceOf(ReopenException.class);
+
+        // specify partition f0 = 2
+        
scan.setScanPartitions(Collections.singletonList(BinaryRow.singleColumn(2)));
+        assertThatCode(
+                        () ->
+                                scan.handleOverwriteSnapshot(
+                                        
storeTable.snapshotManager().latestSnapshot()))
+                .doesNotThrowAnyException();
+
+        // overwrite partition f0 = 3 and reopen
+        
scan.setScanPartitions(Collections.singletonList(BinaryRow.singleColumn(3)));
+        writeBuilder =
+                storeTable
+                        .newBatchWriteBuilder()
+                        .withOverwrite(Collections.singletonMap("f0", "3"));
+        try (BatchTableWrite write = writeBuilder.newWrite();
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            write.write(GenericRow.of(3, 1, 1));
+            commit.commit(write.prepareCommit());
+        }
+        assertThatThrownBy(
+                        () ->
+                                scan.handleOverwriteSnapshot(
+                                        
storeTable.snapshotManager().latestSnapshot()))
+                .isInstanceOf(ReopenException.class);
+    }
+
     private FileStoreTable createDimTable() throws Exception {
         FileIO fileIO = LocalFileIO.create();
         org.apache.paimon.fs.Path tablePath =

Reply via email to