This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cc7cf2961 [flink] support detecting fields changes for dedicated 
compaction in streaming mode (#6690)
1cc7cf2961 is described below

commit 1cc7cf2961277c53a0b26519eef99bfc8bed1450
Author: LsomeYeah <[email protected]>
AuthorDate: Wed Dec 17 10:18:53 2025 +0800

    [flink] support detecting fields changes for dedicated compaction in 
streaming mode (#6690)
---
 .../paimon/flink/compact/AppendTableCompactor.java |  13 +-
 .../flink/sink/AppendCompactWorkerOperator.java    |   5 +-
 ...pendOnlyMultiTableCompactionWorkerOperator.java |   7 +-
 ...endOnlySingleTableCompactionWorkerOperator.java |   4 +-
 .../apache/paimon/flink/sink/CompactRefresher.java | 106 ++++++++++++
 .../{WriterRefresher.java => ConfigRefresher.java} |  43 ++---
 .../paimon/flink/sink/StoreCompactOperator.java    |  17 +-
 .../paimon/flink/sink/TableWriteOperator.java      |   8 +-
 .../apache/paimon/flink/sink/WriteRefresher.java   |  26 +++
 .../paimon/flink/action/CompactActionITCase.java   | 108 +++++++++++++
 .../flink/action/CompactActionITCaseBase.java      |  17 ++
 .../paimon/flink/sink/CompactRefresherTest.java    | 177 +++++++++++++++++++++
 ...RefresherTest.java => ConfigRefresherTest.java} |  60 ++++---
 13 files changed, 521 insertions(+), 70 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java
index 1efbccbc19..0a45514669 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java
@@ -25,7 +25,8 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
 import org.apache.paimon.flink.sink.Committable;
-import org.apache.paimon.flink.sink.WriterRefresher;
+import org.apache.paimon.flink.sink.CompactRefresher;
+import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.operation.BaseAppendFileStoreWrite;
 import org.apache.paimon.operation.FileStoreWrite.State;
 import org.apache.paimon.operation.metrics.CompactionMetrics;
@@ -65,7 +66,7 @@ public class AppendTableCompactor {
     @Nullable private final CompactionMetrics compactionMetrics;
     @Nullable private final CompactionMetrics.Reporter metricsReporter;
 
-    @Nullable protected final WriterRefresher writeRefresher;
+    @Nullable protected final CompactRefresher compactRefresher;
 
     public AppendTableCompactor(
             FileStoreTable table,
@@ -91,7 +92,7 @@ public class AppendTableCompactor {
                         ? null
                         // partition and bucket fields are no use.
                         : 
this.compactionMetrics.createReporter(BinaryRow.EMPTY_ROW, 0);
-        this.writeRefresher = WriterRefresher.create(isStreaming, table, 
this::replace);
+        this.compactRefresher = CompactRefresher.create(isStreaming, table, 
this::replace);
     }
 
     public void processElement(AppendCompactTask task) throws Exception {
@@ -219,12 +220,12 @@ public class AppendTableCompactor {
         this.write.restore(states);
     }
 
-    public void tryRefreshWrite() {
+    public void tryRefreshWrite(List<DataFileMeta> files) {
         if (commitUser == null) {
             return;
         }
-        if (writeRefresher != null) {
-            writeRefresher.tryRefresh();
+        if (compactRefresher != null && (!files.isEmpty())) {
+            compactRefresher.tryRefresh(files);
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java
index aa912a9eb3..e7f6ece177 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java
@@ -84,10 +84,7 @@ public abstract class AppendCompactWorkerOperator<IN>
     @Override
     protected List<Committable> prepareCommit(boolean waitCompaction, long 
checkpointId)
             throws IOException {
-        List<Committable> committables =
-                this.unawareBucketCompactor.prepareCommit(waitCompaction, 
checkpointId);
-        this.unawareBucketCompactor.tryRefreshWrite();
-        return committables;
+        return this.unawareBucketCompactor.prepareCommit(waitCompaction, 
checkpointId);
     }
 
     private ExecutorService workerExecutor() {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
index 4febb8d907..3450d80c30 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
@@ -112,9 +112,10 @@ public class AppendOnlyMultiTableCompactionWorkerOperator
     @Override
     public void processElement(StreamRecord<MultiTableAppendCompactTask> 
element) throws Exception {
         Identifier identifier = element.getValue().tableIdentifier();
-        compactorContainer
-                .computeIfAbsent(identifier, this::compactor)
-                .processElement(element.getValue());
+        AppendTableCompactor compactor =
+                compactorContainer.computeIfAbsent(identifier, 
this::compactor);
+        compactor.tryRefreshWrite(element.getValue().compactBefore());
+        compactor.processElement(element.getValue());
     }
 
     private AppendTableCompactor compactor(Identifier tableId) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java
index c1a4ce82da..6ad1de669b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java
@@ -44,7 +44,9 @@ public class AppendOnlySingleTableCompactionWorkerOperator
 
     @Override
     public void processElement(StreamRecord<AppendCompactTask> element) throws 
Exception {
-        this.unawareBucketCompactor.processElement(element.getValue());
+        AppendCompactTask task = element.getValue();
+        this.unawareBucketCompactor.tryRefreshWrite(task.compactBefore());
+        this.unawareBucketCompactor.processElement(task);
     }
 
     /** {@link StreamOperatorFactory} of {@link 
AppendOnlySingleTableCompactionWorkerOperator}. */
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactRefresher.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactRefresher.java
new file mode 100644
index 0000000000..e59e086754
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactRefresher.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.paimon.flink.sink.ConfigRefresher.configGroups;
+
+/** refresh write when table schema changed. This is for dedicated compaction. 
*/
+public class CompactRefresher {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CompactRefresher.class);
+
+    private FileStoreTable table;
+    private final WriteRefresher refresher;
+    private final @Nullable ConfigRefresher configRefresher;
+
+    private CompactRefresher(FileStoreTable table, WriteRefresher refresher) {
+        this.table = table;
+        this.refresher = refresher;
+        this.configRefresher = ConfigRefresher.create(true, table, refresher);
+    }
+
+    @Nullable
+    public static CompactRefresher create(
+            boolean isStreaming, FileStoreTable table, WriteRefresher 
refresher) {
+        if (!isStreaming) {
+            return null;
+        }
+        return new CompactRefresher(table, refresher);
+    }
+
+    /**
+     * This is used for dedicated compaction in streaming mode. When the 
schema-id of newly added
+     * data files exceeds the current schema-id, the writer needs to be 
refreshed to prevent data
+     * loss.
+     */
+    public void tryRefresh(List<DataFileMeta> files) {
+        long fileSchemaId =
+                
files.stream().mapToLong(DataFileMeta::schemaId).max().orElse(table.schema().id());
+        if (fileSchemaId > table.schema().id()) {
+            Optional<TableSchema> latestSchema = 
table.schemaManager().latest();
+            if (!latestSchema.isPresent()) {
+                return;
+            }
+            TableSchema latest = latestSchema.get();
+
+            try {
+                // here we cannot use table.copy(lastestSchema), because table 
used for
+                // dedicated compaction has some dynamic options, we should 
not overwrite them.
+                // we just copy the lastest fields and options allowed to be 
refreshed.
+                table = table.copyWithLatestSchema();
+
+                // refresh configs allowed to be updated by the way
+                if (configRefresher != null) {
+                    table =
+                            table.copy(
+                                    configGroups(
+                                            configRefresher.configGroups(),
+                                            
CoreOptions.fromMap(latest.options())));
+                    configRefresher.updateTable(table);
+                }
+
+                refresher.refresh(table);
+                LOG.info(
+                        "write has been refreshed due to schema in data files 
changed. new schema id:{}.",
+                        table.schema().id());
+            } catch (Exception e) {
+                throw new RuntimeException("update write failed.", e);
+            }
+
+        } else {
+            // try refresh for configs
+            if (configRefresher != null) {
+                configRefresher.tryRefresh();
+            }
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WriterRefresher.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ConfigRefresher.java
similarity index 77%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WriterRefresher.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ConfigRefresher.java
index 5dca871af5..1d4b8deb08 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WriterRefresher.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ConfigRefresher.java
@@ -42,24 +42,25 @@ import static 
org.apache.paimon.CoreOptions.DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS
 import static org.apache.paimon.CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY;
 import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
 
-/** Writer refresher for refresh write when configs changed. */
-public class WriterRefresher {
+/** refresh write when configs changed. */
+public class ConfigRefresher {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(WriterRefresher.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(ConfigRefresher.class);
 
     private FileStoreTable table;
-    private final Refresher refresher;
+    private final WriteRefresher refresher;
     private final Set<String> configGroups;
 
-    private WriterRefresher(FileStoreTable table, Refresher refresher, 
Set<String> configGroups) {
+    private ConfigRefresher(
+            FileStoreTable table, WriteRefresher refresher, Set<String> 
configGroups) {
         this.table = table;
         this.refresher = refresher;
         this.configGroups = configGroups;
     }
 
     @Nullable
-    public static WriterRefresher create(
-            boolean isStreaming, FileStoreTable table, Refresher refresher) {
+    public static ConfigRefresher create(
+            boolean isStreaming, FileStoreTable table, WriteRefresher 
refresher) {
         if (!isStreaming) {
             return null;
         }
@@ -74,9 +75,13 @@ public class WriterRefresher {
         if (configGroups == null || configGroups.isEmpty()) {
             return null;
         }
-        return new WriterRefresher(table, refresher, configGroups);
+        return new ConfigRefresher(table, refresher, configGroups);
     }
 
+    /**
+     * Try to refresh write when configs which are expected to be refreshed in 
streaming mode
+     * changed.
+     */
     public void tryRefresh() {
         Optional<TableSchema> latestSchema = table.schemaManager().latest();
         if (!latestSchema.isPresent()) {
@@ -92,17 +97,12 @@ public class WriterRefresher {
                         configGroups(configGroups, 
CoreOptions.fromMap(latest.options()));
 
                 if (!Objects.equals(newOptions, currentOptions)) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug(
-                                "table schema has changed, current 
schema-id:{}, try to update write with new schema-id:{}. "
-                                        + "current options:{}, new 
options:{}.",
-                                table.schema().id(),
-                                latestSchema.get().id(),
-                                currentOptions,
-                                newOptions);
-                    }
                     table = table.copy(newOptions);
                     refresher.refresh(table);
+                    LOG.info(
+                            "write has been refreshed due to configs changed. 
old options:{}, new options:{}.",
+                            currentOptions,
+                            newOptions);
                 }
             } catch (Exception e) {
                 throw new RuntimeException("update write failed.", e);
@@ -110,9 +110,12 @@ public class WriterRefresher {
         }
     }
 
-    /** Refresher when configs changed. */
-    public interface Refresher {
-        void refresh(FileStoreTable table) throws Exception;
+    public void updateTable(FileStoreTable table) {
+        this.table = table;
+    }
+
+    public Set<String> configGroups() {
+        return configGroups;
     }
 
     public static Map<String, String> configGroups(Set<String> groups, 
CoreOptions options) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 989c8e91dc..0b7984d2f4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -70,7 +70,7 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
     private transient DataFileMetaSerializer dataFileMetaSerializer;
     private transient Set<Pair<BinaryRow, Integer>> waitToCompact;
 
-    protected transient @Nullable WriterRefresher writeRefresher;
+    protected transient @Nullable CompactRefresher compactRefresher;
 
     public StoreCompactOperator(
             StreamOperatorParameters<Committable> parameters,
@@ -119,7 +119,8 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
                         getContainingTask().getEnvironment().getIOManager(),
                         memoryPoolFactory,
                         getMetricGroup());
-        this.writeRefresher = WriterRefresher.create(write.streamingMode(), 
table, write::replace);
+        this.compactRefresher =
+                CompactRefresher.create(write.streamingMode(), table, 
write::replace);
     }
 
     @Override
@@ -150,6 +151,7 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
 
         if (write.streamingMode()) {
             write.notifyNewFiles(snapshotId, partition, bucket, files);
+            tryRefreshWrite(files);
         } else {
             Preconditions.checkArgument(
                     files.isEmpty(),
@@ -173,10 +175,7 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
         }
         waitToCompact.clear();
 
-        List<Committable> committables = write.prepareCommit(waitCompaction, 
checkpointId);
-
-        tryRefreshWrite();
-        return committables;
+        return write.prepareCommit(waitCompaction, checkpointId);
     }
 
     @Override
@@ -197,9 +196,9 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
         return waitToCompact;
     }
 
-    private void tryRefreshWrite() {
-        if (writeRefresher != null) {
-            writeRefresher.tryRefresh();
+    private void tryRefreshWrite(List<DataFileMeta> files) {
+        if (compactRefresher != null) {
+            compactRefresher.tryRefresh(files);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
index caf92cb742..70a6af8a25 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
@@ -58,7 +58,7 @@ public abstract class TableWriteOperator<IN> extends 
PrepareCommitOperator<IN, C
     protected transient StoreSinkWriteState state;
     protected transient StoreSinkWrite write;
 
-    protected transient @Nullable WriterRefresher writeRefresher;
+    protected transient @Nullable ConfigRefresher configRefresher;
 
     public TableWriteOperator(
             StreamOperatorParameters<Committable> parameters,
@@ -99,7 +99,7 @@ public abstract class TableWriteOperator<IN> extends 
PrepareCommitOperator<IN, C
         if (writeRestore != null) {
             write.setWriteRestore(writeRestore);
         }
-        this.writeRefresher = WriterRefresher.create(write.streamingMode(), 
table, write::replace);
+        this.configRefresher = ConfigRefresher.create(write.streamingMode(), 
table, write::replace);
     }
 
     public void setWriteRestore(@Nullable WriteRestore writeRestore) {
@@ -157,8 +157,8 @@ public abstract class TableWriteOperator<IN> extends 
PrepareCommitOperator<IN, C
     }
 
     protected void tryRefreshWrite() {
-        if (writeRefresher != null) {
-            writeRefresher.tryRefresh();
+        if (configRefresher != null) {
+            configRefresher.tryRefresh();
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WriteRefresher.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WriteRefresher.java
new file mode 100644
index 0000000000..83fb376556
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WriteRefresher.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.table.FileStoreTable;
+
+/** refresher for refreshing write in streaming mode. */
+public interface WriteRefresher {
+    void refresh(FileStoreTable table) throws Exception;
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index b5409e22af..fb941a73d5 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -26,9 +26,11 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.CommonTestUtils;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TraceableFileIO;
@@ -367,6 +369,62 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
         assertThat(fileIO.listStatus(new 
Path(externalPath2)).length).isGreaterThanOrEqualTo(1);
     }
 
+    @Test
+    public void testStreamingCompactWithAddingColumns() throws Exception {
+        Map<String, String> tableOptions = new HashMap<>();
+        tableOptions.put(CoreOptions.CHANGELOG_PRODUCER.key(), 
"full-compaction");
+        tableOptions.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1");
+        tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), 
"1s");
+        tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
+
+        FileStoreTable table =
+                prepareTable(
+                        Arrays.asList("dt", "hh"),
+                        Arrays.asList("dt", "hh", "k"),
+                        Collections.emptyList(),
+                        tableOptions);
+
+        // base records
+        writeData(
+                rowData(1, 100, 15, BinaryString.fromString("20221208")),
+                rowData(1, 100, 16, BinaryString.fromString("20221208")),
+                rowData(1, 100, 15, BinaryString.fromString("20221209")));
+
+        checkLatestSnapshot(table, 1, Snapshot.CommitKind.APPEND);
+        runAction(true);
+        checkLatestSnapshot(table, 2, Snapshot.CommitKind.COMPACT, 60_000);
+
+        SchemaChange schemaChange = SchemaChange.addColumn("new_col", 
DataTypes.INT());
+        table.schemaManager().commitChanges(schemaChange);
+        // wait a checkpoint at least to wait the writer refresh
+        Thread.sleep(1000);
+
+        // incremental records
+        table = getFileStoreTable(tableName);
+        StreamWriteBuilder streamWriteBuilder =
+                table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = streamWriteBuilder.newWrite();
+        commit = streamWriteBuilder.newCommit();
+        writeData(
+                rowData(1, 101, 15, BinaryString.fromString("20221208"), 1),
+                rowData(1, 101, 16, BinaryString.fromString("20221208"), 1),
+                rowData(2, 101, 15, BinaryString.fromString("20221209"), 2));
+        checkLatestSnapshot(table, 3, Snapshot.CommitKind.APPEND);
+        checkLatestSnapshot(table, 4, Snapshot.CommitKind.COMPACT, 60_000);
+
+        List<String> res =
+                getResult(
+                        table.newRead(),
+                        table.newSnapshotReader().read().splits(),
+                        table.rowType());
+        assertThat(res)
+                .containsExactlyInAnyOrder(
+                        "+I[1, 101, 15, 20221208, 1]",
+                        "+I[1, 101, 16, 20221208, 1]",
+                        "+I[1, 100, 15, 20221209, NULL]",
+                        "+I[2, 101, 15, 20221209, 2]");
+    }
+
     @Test
     public void testUnawareBucketStreamingCompact() throws Exception {
         Map<String, String> tableOptions = new HashMap<>();
@@ -474,6 +532,56 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
         assertThat(fileIO.listStatus(new 
Path(externalPath2)).length).isGreaterThanOrEqualTo(1);
     }
 
+    @Test
+    public void testUnawareBucketStreamingCompactWithWithAddingColumns() 
throws Exception {
+        Map<String, String> tableOptions = new HashMap<>();
+        tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), 
"1s");
+        tableOptions.put(CoreOptions.BUCKET.key(), "-1");
+        tableOptions.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+        tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
+
+        FileStoreTable table =
+                prepareTable(
+                        Collections.singletonList("k"),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        tableOptions);
+
+        // base records
+        writeData(rowData(1, 100, 15, BinaryString.fromString("20221208")));
+        writeData(rowData(1, 100, 16, BinaryString.fromString("20221208")));
+
+        checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
+        // repairing that the ut don't specify the real partition of table
+        runActionForUnawareTable(true);
+
+        checkLatestSnapshot(table, 3, Snapshot.CommitKind.COMPACT, 60_000);
+
+        SchemaChange schemaChange = SchemaChange.addColumn("new_col", 
DataTypes.INT());
+        table.schemaManager().commitChanges(schemaChange);
+        Thread.sleep(1000);
+
+        table = getFileStoreTable(tableName);
+        StreamWriteBuilder streamWriteBuilder =
+                table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = streamWriteBuilder.newWrite();
+        commit = streamWriteBuilder.newCommit();
+        writeData(rowData(1, 101, 15, BinaryString.fromString("20221208"), 1));
+        checkLatestSnapshot(table, 4, Snapshot.CommitKind.APPEND);
+        checkLatestSnapshot(table, 5, Snapshot.CommitKind.COMPACT, 60_000);
+
+        List<String> res =
+                getResult(
+                        table.newRead(),
+                        table.newSnapshotReader().read().splits(),
+                        table.rowType());
+        assertThat(res)
+                .containsExactlyInAnyOrder(
+                        "+I[1, 100, 15, 20221208, NULL]",
+                        "+I[1, 100, 16, 20221208, NULL]",
+                        "+I[1, 101, 15, 20221208, 1]");
+    }
+
     @Test
     public void testUnawareBucketBatchCompact() throws Exception {
         Map<String, String> tableOptions = new HashMap<>();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
index 41d01bdf7f..c246f6a4bc 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
@@ -107,6 +107,23 @@ public class CompactActionITCaseBase extends 
ActionITCaseBase {
         assertThat(snapshot.commitKind()).isEqualTo(commitKind);
     }
 
+    protected void checkLatestSnapshot(
+            FileStoreTable table, long snapshotId, Snapshot.CommitKind 
commitKind, long timeout)
+            throws Exception {
+        SnapshotManager snapshotManager = table.snapshotManager();
+        long start = System.currentTimeMillis();
+        while (!Objects.equals(snapshotManager.latestSnapshotId(), 
snapshotId)) {
+            Thread.sleep(500);
+            if (System.currentTimeMillis() - start > timeout) {
+                throw new RuntimeException("can't wait for a compaction.");
+            }
+        }
+
+        Snapshot snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+        assertThat(snapshot.id()).isEqualTo(snapshotId);
+        assertThat(snapshot.commitKind()).isEqualTo(commitKind);
+    }
+
     protected FileStoreTable prepareTable(
             List<String> partitionKeys,
             List<String> primaryKeys,
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactRefresherTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactRefresherTest.java
new file mode 100644
index 0000000000..aed46ba74c
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactRefresherTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.flink.sink.ConfigRefresher.configGroups;
+import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
+import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CompactRefresher}. */
+public class CompactRefresherTest {
+
+    @TempDir public java.nio.file.Path tempDir;
+
+    Catalog catalog;
+
+    @BeforeEach
+    public void before() throws Exception {
+        Options options = new Options();
+        options.set(WAREHOUSE, tempDir.toString());
+        options.set(CACHE_ENABLED, false);
+        CatalogContext context = CatalogContext.create(options);
+        catalog = CatalogFactory.createCatalog(context);
+        catalog.createDatabase("default", true);
+    }
+
+    @Test
+    public void testRefreshFieldsAndConfigs() throws Exception {
+        String detectGroups = "external-paths";
+        Map<String, String> options = new HashMap<>();
+        options.put(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECTORS.key(), 
detectGroups);
+        createTable(options);
+        FileStoreTable table1 = getTable();
+
+        table1.schemaManager()
+                .commitChanges(
+                        SchemaChange.addColumn("c", DataTypes.INT()),
+                        SchemaChange.setOption(
+                                
CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(),
+                                "round-robin"));
+        FileStoreTable table2 = getTable();
+        try (BatchTableWrite write = table2.newBatchWriteBuilder().newWrite();
+                BatchTableCommit commit = 
table2.newBatchWriteBuilder().newCommit()) {
+            write.write(GenericRow.of(1, 1, 1));
+            commit.commit(write.prepareCommit());
+        }
+
+        List<DataField> dataFields = new ArrayList<>();
+        Map<String, String> refreshedOptions = new HashMap<>();
+        Set<String> groups = 
Arrays.stream(detectGroups.split(",")).collect(Collectors.toSet());
+        CompactRefresher writerRefresher =
+                CompactRefresher.create(
+                        true,
+                        table1,
+                        new ConfigRefresherTest.TestWriteRefresher(
+                                groups, refreshedOptions, dataFields));
+        writerRefresher.tryRefresh(
+                
table2.newSnapshotReader().read().dataSplits().get(0).dataFiles());
+        assertThat(dataFields).isEqualTo(table2.schema().fields());
+        assertThat(refreshedOptions).isEqualTo(configGroups(groups, 
table2.coreOptions()));
+
+        // should not refresh again
+        writerRefresher.tryRefresh(Collections.emptyList());
+    }
+
+    @Test
+    public void testRefreshOnlyFields() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        createTable(options);
+        FileStoreTable table1 = getTable();
+
+        table1.schemaManager().commitChanges(SchemaChange.addColumn("c", 
DataTypes.INT()));
+        FileStoreTable table2 = getTable();
+        try (BatchTableWrite write = table2.newBatchWriteBuilder().newWrite();
+                BatchTableCommit commit = 
table2.newBatchWriteBuilder().newCommit()) {
+            write.write(GenericRow.of(1, 1, 1));
+            commit.commit(write.prepareCommit());
+        }
+
+        List<DataField> dataFields = new ArrayList<>();
+        CompactRefresher writerRefresher =
+                CompactRefresher.create(
+                        true,
+                        table1,
+                        new ConfigRefresherTest.TestWriteRefresher(
+                                null, Collections.emptyMap(), dataFields));
+        writerRefresher.tryRefresh(
+                
table2.newSnapshotReader().read().dataSplits().get(0).dataFiles());
+        assertThat(dataFields).isEqualTo(table2.schema().fields());
+    }
+
+    @Test
+    public void testRefreshOnlyConfigs() throws Exception {
+        String detectGroups = "external-paths";
+        Map<String, String> options = new HashMap<>();
+        options.put(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECTORS.key(), 
detectGroups);
+        createTable(options);
+        FileStoreTable table1 = getTable();
+
+        table1.schemaManager()
+                .commitChanges(
+                        SchemaChange.setOption(
+                                
CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(),
+                                "round-robin"));
+        FileStoreTable table2 = getTable();
+
+        Map<String, String> refreshedOptions = new HashMap<>();
+        Set<String> groups = 
Arrays.stream(detectGroups.split(",")).collect(Collectors.toSet());
+        CompactRefresher writerRefresher =
+                CompactRefresher.create(
+                        true,
+                        table1,
+                        new ConfigRefresherTest.TestWriteRefresher(groups, 
refreshedOptions, null));
+        writerRefresher.tryRefresh(Collections.emptyList());
+        assertThat(refreshedOptions).isEqualTo(configGroups(groups, 
table2.coreOptions()));
+    }
+
+    private void createTable(Map<String, String> options) throws Exception {
+        catalog.createTable(
+                Identifier.create("default", "T"),
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.INT())
+                        .options(options)
+                        .build(),
+                false);
+    }
+
+    private FileStoreTable getTable() throws Exception {
+        return (FileStoreTable) catalog.getTable(Identifier.create("default", 
"T"));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterRefresherTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/ConfigRefresherTest.java
similarity index 83%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterRefresherTest.java
rename to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/ConfigRefresherTest.java
index 10a793753e..9e51a1cc8d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterRefresherTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/ConfigRefresherTest.java
@@ -28,6 +28,7 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -36,17 +37,18 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static org.apache.paimon.flink.sink.WriterRefresher.configGroups;
+import static org.apache.paimon.flink.sink.ConfigRefresher.configGroups;
 import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
 import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test for {@link WriterRefresher}. */
-public class WriterRefresherTest {
+/** Test for {@link ConfigRefresher}. */
+public class ConfigRefresherTest {
     @TempDir public java.nio.file.Path tempDir;
 
     Catalog catalog;
@@ -81,12 +83,12 @@ public class WriterRefresherTest {
 
         Map<String, String> refreshedOptions = new HashMap<>();
         Set<String> groups = 
Arrays.stream(detectGroups.split(",")).collect(Collectors.toSet());
-        WriterRefresher writerRefresher =
-                WriterRefresher.create(
+        ConfigRefresher configRefresher =
+                ConfigRefresher.create(
                         true, table1, new TestWriteRefresher(groups, 
refreshedOptions));
-        writerRefresher.tryRefresh();
+        configRefresher.tryRefresh();
         assertThat(refreshedOptions).isEqualTo(configGroups(groups, 
table2.coreOptions()));
-        writerRefresher.tryRefresh();
+        configRefresher.tryRefresh();
     }
 
     @Test
@@ -95,9 +97,9 @@ public class WriterRefresherTest {
         Map<String, String> options = new HashMap<>();
         createTable(options);
         FileStoreTable table1 = getTable();
-        WriterRefresher writerRefresher =
-                WriterRefresher.create(true, table1, new 
TestWriteRefresher(null, null));
-        assertThat(writerRefresher).isNull();
+        ConfigRefresher configRefresher =
+                ConfigRefresher.create(true, table1, new 
TestWriteRefresher(null, null));
+        assertThat(configRefresher).isNull();
     }
 
     @Test
@@ -107,9 +109,9 @@ public class WriterRefresherTest {
         options.put(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECTORS.key(), 
"");
         createTable(options);
         FileStoreTable table1 = getTable();
-        WriterRefresher writerRefresher =
-                WriterRefresher.create(true, table1, new 
TestWriteRefresher(null, null));
-        assertThat(writerRefresher).isNull();
+        ConfigRefresher configRefresher =
+                ConfigRefresher.create(true, table1, new 
TestWriteRefresher(null, null));
+        assertThat(configRefresher).isNull();
     }
 
     @Test
@@ -126,9 +128,9 @@ public class WriterRefresherTest {
                         .filter(s -> !s.trim().isEmpty())
                         .collect(Collectors.toSet());
 
-        WriterRefresher writerRefresher =
-                WriterRefresher.create(true, table1, new 
TestWriteRefresher(emptyGroups, null));
-        assertThat(writerRefresher).isNull();
+        ConfigRefresher configRefresher =
+                ConfigRefresher.create(true, table1, new 
TestWriteRefresher(emptyGroups, null));
+        assertThat(configRefresher).isNull();
     }
 
     @Test
@@ -144,12 +146,12 @@ public class WriterRefresherTest {
         refreshedOptions.put("initial", "value");
 
         Set<String> groups = 
Arrays.stream(detectGroups.split(",")).collect(Collectors.toSet());
-        WriterRefresher writerRefresher =
-                WriterRefresher.create(
+        ConfigRefresher configRefresher =
+                ConfigRefresher.create(
                         true, table1, new TestWriteRefresher(groups, 
refreshedOptions));
 
         // No schema changes made, should not refresh
-        writerRefresher.tryRefresh();
+        configRefresher.tryRefresh();
 
         // Options should remain unchanged
         assertThat(refreshedOptions).containsEntry("initial", "value");
@@ -180,12 +182,12 @@ public class WriterRefresherTest {
         refreshedOptions.put("initial", "value");
 
         Set<String> groups = 
Arrays.stream(detectGroups.split(",")).collect(Collectors.toSet());
-        WriterRefresher writerRefresher =
-                WriterRefresher.create(
+        ConfigRefresher configRefresher =
+                ConfigRefresher.create(
                         true, table1, new TestWriteRefresher(groups, 
refreshedOptions));
 
         // Should not refresh when monitored config groups haven't changed
-        writerRefresher.tryRefresh();
+        configRefresher.tryRefresh();
 
         // Options should remain unchanged
         assertThat(refreshedOptions).containsEntry("initial", "value");
@@ -207,14 +209,22 @@ public class WriterRefresherTest {
         return (FileStoreTable) catalog.getTable(Identifier.create("default", 
"T"));
     }
 
-    private static class TestWriteRefresher implements 
WriterRefresher.Refresher {
+    /** Test implementation of {@link WriteRefresher} for testing purposes. */
+    protected static class TestWriteRefresher implements WriteRefresher {
 
         private final Set<String> groups;
         private final Map<String, String> options;
+        private final List<DataField> dataFields;
 
         TestWriteRefresher(Set<String> groups, Map<String, String> options) {
+            this(groups, options, null);
+        }
+
+        TestWriteRefresher(
+                Set<String> groups, Map<String, String> options, 
List<DataField> fields) {
             this.groups = groups;
             this.options = options;
+            this.dataFields = fields;
         }
 
         @Override
@@ -223,6 +233,10 @@ public class WriterRefresherTest {
             if (groups != null) {
                 options.putAll(configGroups(groups, table.coreOptions()));
             }
+            if (dataFields != null) {
+                dataFields.clear();
+                dataFields.addAll(table.schema().fields());
+            }
         }
     }
 }


Reply via email to