This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2cb22c3e68 [flink] Refactor WriterRefresher to better abstraction 
(#6070)
2cb22c3e68 is described below

commit 2cb22c3e68c50c44831b341197409baabacfc9b8
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Aug 13 13:54:01 2025 +0800

    [flink] Refactor WriterRefresher to better abstraction (#6070)
---
 .../main/java/org/apache/paimon/CoreOptions.java   |  12 ---
 .../paimon/flink/compact/AppendTableCompactor.java |  37 ++++----
 .../paimon/flink/sink/StoreCompactOperator.java    |  15 +---
 .../paimon/flink/sink/TableWriteOperator.java      |  15 +---
 .../apache/paimon/flink/sink/WriterRefresher.java  |  76 ++++++++++------
 .../paimon/flink/sink/WriterRefresherTest.java     | 100 ++++++---------------
 6 files changed, 98 insertions(+), 157 deletions(-)

diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 488445352c..e0f9af088d 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2683,18 +2683,6 @@ public class CoreOptions implements Serializable {
         return options.get(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS);
     }
 
-    public Map<String, String> configGroups(Set<String> groups) {
-        Map<String, String> configs = new HashMap<>();
-        // external-paths config group
-        String externalPaths = "external-paths";
-        if (groups.contains(externalPaths)) {
-            configs.put(DATA_FILE_EXTERNAL_PATHS.key(), 
dataFileExternalPaths());
-            configs.put(DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(), 
externalPathStrategy().toString());
-            configs.put(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS.key(), 
externalSpecificFS());
-        }
-        return configs;
-    }
-
     public Boolean forceRewriteAllFiles() {
         return options.get(COMPACTION_FORCE_REWRITE_ALL_FILES);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java
index 45d60cac24..4b85eb5169 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactor.java
@@ -22,11 +22,12 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.append.AppendCompactTask;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
 import org.apache.paimon.flink.sink.Committable;
 import org.apache.paimon.flink.sink.WriterRefresher;
 import org.apache.paimon.operation.BaseAppendFileStoreWrite;
-import org.apache.paimon.operation.FileStoreWrite;
+import org.apache.paimon.operation.FileStoreWrite.State;
 import org.apache.paimon.operation.metrics.CompactionMetrics;
 import org.apache.paimon.operation.metrics.MetricUtils;
 import org.apache.paimon.table.FileStoreTable;
@@ -56,17 +57,15 @@ public class AppendTableCompactor {
     private static final Logger LOG = 
LoggerFactory.getLogger(AppendTableCompactor.class);
 
     private FileStoreTable table;
-    private final String commitUser;
-
-    private transient BaseAppendFileStoreWrite write;
+    private BaseAppendFileStoreWrite write;
 
-    protected final transient Queue<Future<CommitMessage>> result;
-
-    private final transient Supplier<ExecutorService> compactExecutorsupplier;
-    @Nullable private final transient CompactionMetrics compactionMetrics;
-    @Nullable private final transient CompactionMetrics.Reporter 
metricsReporter;
+    private final String commitUser;
+    protected final Queue<Future<CommitMessage>> result;
+    private final Supplier<ExecutorService> compactExecutorsupplier;
+    @Nullable private final CompactionMetrics compactionMetrics;
+    @Nullable private final CompactionMetrics.Reporter metricsReporter;
 
-    @Nullable protected final transient 
WriterRefresher<BaseAppendFileStoreWrite> writeRefresher;
+    @Nullable protected final WriterRefresher writeRefresher;
 
     public AppendTableCompactor(
             FileStoreTable table,
@@ -92,12 +91,7 @@ public class AppendTableCompactor {
                         ? null
                         // partition and bucket fields are no use.
                         : 
this.compactionMetrics.createReporter(BinaryRow.EMPTY_ROW, 0);
-        if (isStreaming) {
-            this.writeRefresher =
-                    new WriterRefresher<>(table, write, (newTable, writer) -> 
replace(newTable));
-        } else {
-            this.writeRefresher = null;
-        }
+        this.writeRefresher = WriterRefresher.create(isStreaming, table, 
this::replace);
     }
 
     public void processElement(AppendCompactTask task) throws Exception {
@@ -218,18 +212,17 @@ public class AppendTableCompactor {
     }
 
     private void replace(FileStoreTable newTable) throws Exception {
-
-        List<? extends FileStoreWrite.State<?>> states = write.checkpoint();
-        write.close();
-        write = (BaseAppendFileStoreWrite) 
newTable.store().newWrite(commitUser);
-        write.restore((List) states);
+        this.table = newTable;
+        List<State<InternalRow>> states = write.checkpoint();
+        this.write.close();
+        this.write = (BaseAppendFileStoreWrite) 
newTable.store().newWrite(commitUser);
+        this.write.restore(states);
     }
 
     public void tryRefreshWrite() {
         if (commitUser == null) {
             return;
         }
-
         if (writeRefresher != null) {
             writeRefresher.tryRefresh();
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 47398db852..7676084b94 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -70,7 +70,7 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
     private transient DataFileMetaSerializer dataFileMetaSerializer;
     private transient Set<Pair<BinaryRow, Integer>> waitToCompact;
 
-    protected transient @Nullable WriterRefresher<StoreSinkWrite> 
writeRefresher;
+    protected transient @Nullable WriterRefresher writeRefresher;
 
     public StoreCompactOperator(
             StreamOperatorParameters<Committable> parameters,
@@ -119,18 +119,7 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
                         getContainingTask().getEnvironment().getIOManager(),
                         memoryPool,
                         getMetricGroup());
-
-        if (write.streamingMode()) {
-            writeRefresher =
-                    new WriterRefresher<>(
-                            table,
-                            write,
-                            (newTable, writer) -> {
-                                writer.replace(newTable);
-                            });
-        } else {
-            writeRefresher = null;
-        }
+        this.writeRefresher = WriterRefresher.create(write.streamingMode(), 
table, write::replace);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
index bdce88e2ea..ca27059265 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
@@ -58,7 +58,7 @@ public abstract class TableWriteOperator<IN> extends 
PrepareCommitOperator<IN, C
     protected transient StoreSinkWriteState state;
     protected transient StoreSinkWrite write;
 
-    protected transient @Nullable WriterRefresher<StoreSinkWrite> 
writeRefresher;
+    protected transient @Nullable WriterRefresher writeRefresher;
 
     public TableWriteOperator(
             StreamOperatorParameters<Committable> parameters,
@@ -99,18 +99,7 @@ public abstract class TableWriteOperator<IN> extends 
PrepareCommitOperator<IN, C
         if (writeRestore != null) {
             write.setWriteRestore(writeRestore);
         }
-
-        if (write.streamingMode()) {
-            writeRefresher =
-                    new WriterRefresher<>(
-                            table,
-                            write,
-                            (newTable, writer) -> {
-                                writer.replace(newTable);
-                            });
-        } else {
-            writeRefresher = null;
-        }
+        this.writeRefresher = WriterRefresher.create(write.streamingMode(), 
table, write::replace);
     }
 
     public void setWriteRestore(@Nullable WriteRestore writeRestore) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WriterRefresher.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WriterRefresher.java
index 37d3a5cc9d..5dca871af5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WriterRefresher.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WriterRefresher.java
@@ -23,7 +23,6 @@ import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.StringUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,49 +30,66 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.CoreOptions.DATA_FILE_EXTERNAL_PATHS;
+import static 
org.apache.paimon.CoreOptions.DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS;
+import static org.apache.paimon.CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY;
+import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
+
 /** Writer refresher for refresh write when configs changed. */
-public class WriterRefresher<T> {
+public class WriterRefresher {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(WriterRefresher.class);
 
-    @Nullable private final Set<String> configGroups;
-    private final Refresher<T> refresher;
     private FileStoreTable table;
-    private T write;
+    private final Refresher refresher;
+    private final Set<String> configGroups;
 
-    public WriterRefresher(FileStoreTable table, T write, Refresher<T> 
refresher) {
+    private WriterRefresher(FileStoreTable table, Refresher refresher, 
Set<String> configGroups) {
         this.table = table;
-        this.write = write;
         this.refresher = refresher;
+        this.configGroups = configGroups;
+    }
+
+    @Nullable
+    public static WriterRefresher create(
+            boolean isStreaming, FileStoreTable table, Refresher refresher) {
+        if (!isStreaming) {
+            return null;
+        }
+
         String refreshDetectors =
                 Options.fromMap(table.options())
                         
.get(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECTORS);
-        if (StringUtils.isNullOrWhitespaceOnly(refreshDetectors)) {
-            configGroups = null;
-        } else {
-            configGroups = 
Arrays.stream(refreshDetectors.split(",")).collect(Collectors.toSet());
+        Set<String> configGroups =
+                isNullOrWhitespaceOnly(refreshDetectors)
+                        ? null
+                        : 
Arrays.stream(refreshDetectors.split(",")).collect(Collectors.toSet());
+        if (configGroups == null || configGroups.isEmpty()) {
+            return null;
         }
+        return new WriterRefresher(table, refresher, configGroups);
     }
 
     public void tryRefresh() {
-        if (configGroups == null || configGroups.isEmpty()) {
+        Optional<TableSchema> latestSchema = table.schemaManager().latest();
+        if (!latestSchema.isPresent()) {
             return;
         }
 
-        Optional<TableSchema> latestSchema = table.schemaManager().latest();
-        if (latestSchema.isPresent() && latestSchema.get().id() > 
table.schema().id()) {
+        TableSchema latest = latestSchema.get();
+        if (latest.id() > table.schema().id()) {
             try {
                 Map<String, String> currentOptions =
-                        
CoreOptions.fromMap(table.schema().options()).configGroups(configGroups);
+                        configGroups(configGroups, table.coreOptions());
                 Map<String, String> newOptions =
-                        CoreOptions.fromMap(latestSchema.get().options())
-                                .configGroups(configGroups);
+                        configGroups(configGroups, 
CoreOptions.fromMap(latest.options()));
 
                 if (!Objects.equals(newOptions, currentOptions)) {
                     if (LOG.isDebugEnabled()) {
@@ -86,7 +102,7 @@ public class WriterRefresher<T> {
                                 newOptions);
                     }
                     table = table.copy(newOptions);
-                    refresher.refresh(table, write);
+                    refresher.refresh(table);
                 }
             } catch (Exception e) {
                 throw new RuntimeException("update write failed.", e);
@@ -94,12 +110,22 @@ public class WriterRefresher<T> {
         }
     }
 
-    /**
-     * Refresher for refresh write when configs changed.
-     *
-     * @param <T> the type of writer.
-     */
-    public interface Refresher<T> {
-        void refresh(FileStoreTable table, T writer) throws Exception;
+    /** Refresher when configs changed. */
+    public interface Refresher {
+        void refresh(FileStoreTable table) throws Exception;
+    }
+
+    public static Map<String, String> configGroups(Set<String> groups, 
CoreOptions options) {
+        Map<String, String> configs = new HashMap<>();
+        // external-paths config group
+        String externalPaths = "external-paths";
+        if (groups.contains(externalPaths)) {
+            configs.put(DATA_FILE_EXTERNAL_PATHS.key(), 
options.dataFileExternalPaths());
+            configs.put(
+                    DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(),
+                    options.externalPathStrategy().toString());
+            configs.put(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS.key(), 
options.externalSpecificFS());
+        }
+        return configs;
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterRefresherTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterRefresherTest.java
index ca4e3a76b6..10a793753e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterRefresherTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterRefresherTest.java
@@ -40,6 +40,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.flink.sink.WriterRefresher.configGroups;
 import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
 import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -80,10 +81,11 @@ public class WriterRefresherTest {
 
         Map<String, String> refreshedOptions = new HashMap<>();
         Set<String> groups = 
Arrays.stream(detectGroups.split(",")).collect(Collectors.toSet());
-        WriterRefresher<?> writerRefresher =
-                new WriterRefresher<>(table1, refreshedOptions, new 
TestWriteRefresher(groups));
+        WriterRefresher writerRefresher =
+                WriterRefresher.create(
+                        true, table1, new TestWriteRefresher(groups, 
refreshedOptions));
         writerRefresher.tryRefresh();
-        
assertThat(refreshedOptions).isEqualTo(table2.coreOptions().configGroups(groups));
+        assertThat(refreshedOptions).isEqualTo(configGroups(groups, 
table2.coreOptions()));
         writerRefresher.tryRefresh();
     }
 
@@ -92,27 +94,10 @@ public class WriterRefresherTest {
         // Create table without SINK_WRITER_REFRESH_DETECTORS option
         Map<String, String> options = new HashMap<>();
         createTable(options);
-
         FileStoreTable table1 = getTable();
-
-        // Make schema changes
-        table1.schemaManager()
-                .commitChanges(
-                        SchemaChange.setOption(
-                                CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(), 
"external-path1"));
-
-        Map<String, String> refreshedOptions = new HashMap<>();
-        refreshedOptions.put("initial", "value");
-
-        WriterRefresher<?> writerRefresher =
-                new WriterRefresher<>(table1, refreshedOptions, new 
TestWriteRefresher(null));
-
-        // Should not refresh when configGroups is null
-        writerRefresher.tryRefresh();
-
-        // Options should remain unchanged
-        assertThat(refreshedOptions).containsEntry("initial", "value");
-        assertThat(refreshedOptions).hasSize(1);
+        WriterRefresher writerRefresher =
+                WriterRefresher.create(true, table1, new 
TestWriteRefresher(null, null));
+        assertThat(writerRefresher).isNull();
     }
 
     @Test
@@ -121,27 +106,10 @@ public class WriterRefresherTest {
         Map<String, String> options = new HashMap<>();
         options.put(FlinkConnectorOptions.SINK_WRITER_REFRESH_DETECTORS.key(), 
"");
         createTable(options);
-
         FileStoreTable table1 = getTable();
-
-        // Make schema changes
-        table1.schemaManager()
-                .commitChanges(
-                        SchemaChange.setOption(
-                                CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(), 
"external-path1"));
-
-        Map<String, String> refreshedOptions = new HashMap<>();
-        refreshedOptions.put("initial", "value");
-
-        WriterRefresher<?> writerRefresher =
-                new WriterRefresher<>(table1, refreshedOptions, new 
TestWriteRefresher(null));
-
-        // Should not refresh when configGroups is empty
-        writerRefresher.tryRefresh();
-
-        // Options should remain unchanged since empty configGroups should 
trigger early return
-        assertThat(refreshedOptions).containsEntry("initial", "value");
-        assertThat(refreshedOptions).hasSize(1);
+        WriterRefresher writerRefresher =
+                WriterRefresher.create(true, table1, new 
TestWriteRefresher(null, null));
+        assertThat(writerRefresher).isNull();
     }
 
     @Test
@@ -153,30 +121,14 @@ public class WriterRefresherTest {
 
         FileStoreTable table1 = getTable();
 
-        // Make schema changes
-        table1.schemaManager()
-                .commitChanges(
-                        SchemaChange.setOption(
-                                CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(), 
"external-path1"));
-
-        Map<String, String> refreshedOptions = new HashMap<>();
-        refreshedOptions.put("initial", "value");
-
         Set<String> emptyGroups =
                 Arrays.stream(",,,".split(","))
                         .filter(s -> !s.trim().isEmpty())
                         .collect(Collectors.toSet());
 
-        WriterRefresher<?> writerRefresher =
-                new WriterRefresher<>(
-                        table1, refreshedOptions, new 
TestWriteRefresher(emptyGroups));
-
-        // Should not refresh when configGroups is effectively empty
-        writerRefresher.tryRefresh();
-
-        // Options should remain unchanged
-        assertThat(refreshedOptions).containsEntry("initial", "value");
-        assertThat(refreshedOptions).hasSize(1);
+        WriterRefresher writerRefresher =
+                WriterRefresher.create(true, table1, new 
TestWriteRefresher(emptyGroups, null));
+        assertThat(writerRefresher).isNull();
     }
 
     @Test
@@ -192,8 +144,9 @@ public class WriterRefresherTest {
         refreshedOptions.put("initial", "value");
 
         Set<String> groups = 
Arrays.stream(detectGroups.split(",")).collect(Collectors.toSet());
-        WriterRefresher<?> writerRefresher =
-                new WriterRefresher<>(table1, refreshedOptions, new 
TestWriteRefresher(groups));
+        WriterRefresher writerRefresher =
+                WriterRefresher.create(
+                        true, table1, new TestWriteRefresher(groups, 
refreshedOptions));
 
         // No schema changes made, should not refresh
         writerRefresher.tryRefresh();
@@ -227,8 +180,9 @@ public class WriterRefresherTest {
         refreshedOptions.put("initial", "value");
 
         Set<String> groups = 
Arrays.stream(detectGroups.split(",")).collect(Collectors.toSet());
-        WriterRefresher<?> writerRefresher =
-                new WriterRefresher<>(table1, refreshedOptions, new 
TestWriteRefresher(groups));
+        WriterRefresher writerRefresher =
+                WriterRefresher.create(
+                        true, table1, new TestWriteRefresher(groups, 
refreshedOptions));
 
         // Should not refresh when monitored config groups haven't changed
         writerRefresher.tryRefresh();
@@ -253,19 +207,21 @@ public class WriterRefresherTest {
         return (FileStoreTable) catalog.getTable(Identifier.create("default", 
"T"));
     }
 
-    private static class TestWriteRefresher
-            implements WriterRefresher.Refresher<Map<String, String>> {
-        Set<String> groups;
+    private static class TestWriteRefresher implements 
WriterRefresher.Refresher {
+
+        private final Set<String> groups;
+        private final Map<String, String> options;
 
-        TestWriteRefresher(Set<String> groups) {
+        TestWriteRefresher(Set<String> groups, Map<String, String> options) {
             this.groups = groups;
+            this.options = options;
         }
 
         @Override
-        public void refresh(FileStoreTable table, Map<String, String> options) 
throws Exception {
+        public void refresh(FileStoreTable table) {
             options.clear();
             if (groups != null) {
-                options.putAll(table.coreOptions().configGroups(groups));
+                options.putAll(configGroups(groups, table.coreOptions()));
             }
         }
     }

Reply via email to