This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 380f367236 [flink] Move FlinkSink#createWriteProvider to a common 
place so CDC sink can also use it (#5824)
380f367236 is described below

commit 380f367236affbe544445752c97245b9fb4d861f
Author: tsreaper <[email protected]>
AuthorDate: Tue Jul 1 22:09:34 2025 +0800

    [flink] Move FlinkSink#createWriteProvider to a common place so CDC sink 
can also use it (#5824)
---
 .../org/apache/paimon/flink/sink/FlinkSink.java    | 94 +--------------------
 .../sink/MultiTablesStoreCompactOperator.java      | 76 +----------------
 .../apache/paimon/flink/sink/StoreSinkWrite.java   | 98 ++++++++++++++++++++++
 3 files changed, 103 insertions(+), 165 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 6349dc3972..6ce295b31d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -18,8 +18,6 @@
 
 package org.apache.paimon.flink.sink;
 
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.CoreOptions.ChangelogProducer;
 import org.apache.paimon.CoreOptions.TagCreationMode;
 import 
org.apache.paimon.flink.compact.changelog.ChangelogCompactCoordinateOperator;
 import org.apache.paimon.flink.compact.changelog.ChangelogCompactSortOperator;
@@ -29,8 +27,6 @@ import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.SerializableRunnable;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.operators.SlotSharingGroup;
@@ -46,7 +42,6 @@ import 
org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 
 import javax.annotation.Nullable;
 
@@ -56,9 +51,7 @@ import java.util.LinkedList;
 import java.util.Queue;
 import java.util.Set;
 
-import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
 import static org.apache.paimon.CoreOptions.createCommitUser;
-import static 
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.END_INPUT_WATERMARK;
 import static org.apache.paimon.flink.FlinkConnectorOptions.PRECOMMIT_COMPACT;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT;
@@ -92,89 +85,6 @@ public abstract class FlinkSink<T> implements Serializable {
         this.ignorePreviousFiles = ignorePreviousFiles;
     }
 
-    private StoreSinkWrite.Provider createWriteProvider(
-            CheckpointConfig checkpointConfig, boolean isStreaming, boolean 
hasSinkMaterializer) {
-        SerializableRunnable assertNoSinkMaterializer =
-                () ->
-                        Preconditions.checkArgument(
-                                !hasSinkMaterializer,
-                                String.format(
-                                        "Sink materializer must not be used 
with Paimon sink. "
-                                                + "Please set '%s' to '%s' in 
Flink's config.",
-                                        
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE
-                                                .key(),
-                                        
ExecutionConfigOptions.UpsertMaterialize.NONE.name()));
-
-        Options options = table.coreOptions().toConfiguration();
-        ChangelogProducer changelogProducer = 
table.coreOptions().changelogProducer();
-        boolean waitCompaction;
-        CoreOptions coreOptions = table.coreOptions();
-        if (coreOptions.writeOnly()) {
-            waitCompaction = false;
-        } else {
-            waitCompaction = coreOptions.prepareCommitWaitCompaction();
-            int deltaCommits = -1;
-            if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
-                deltaCommits = options.get(FULL_COMPACTION_DELTA_COMMITS);
-            } else if 
(options.contains(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) {
-                long fullCompactionThresholdMs =
-                        
options.get(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL).toMillis();
-                deltaCommits =
-                        (int)
-                                (fullCompactionThresholdMs
-                                        / 
checkpointConfig.getCheckpointInterval());
-            }
-
-            if (changelogProducer == ChangelogProducer.FULL_COMPACTION || 
deltaCommits >= 0) {
-                int finalDeltaCommits = Math.max(deltaCommits, 1);
-                return (table, commitUser, state, ioManager, memoryPool, 
metricGroup) -> {
-                    assertNoSinkMaterializer.run();
-                    return new GlobalFullCompactionSinkWrite(
-                            table,
-                            commitUser,
-                            state,
-                            ioManager,
-                            ignorePreviousFiles,
-                            waitCompaction,
-                            finalDeltaCommits,
-                            isStreaming,
-                            memoryPool,
-                            metricGroup);
-                };
-            }
-
-            if (coreOptions.needLookup()) {
-                return (table, commitUser, state, ioManager, memoryPool, 
metricGroup) -> {
-                    assertNoSinkMaterializer.run();
-                    return new LookupSinkWrite(
-                            table,
-                            commitUser,
-                            state,
-                            ioManager,
-                            ignorePreviousFiles,
-                            waitCompaction,
-                            isStreaming,
-                            memoryPool,
-                            metricGroup);
-                };
-            }
-        }
-
-        return (table, commitUser, state, ioManager, memoryPool, metricGroup) 
-> {
-            assertNoSinkMaterializer.run();
-            return new StoreSinkWriteImpl(
-                    table,
-                    commitUser,
-                    state,
-                    ioManager,
-                    ignorePreviousFiles,
-                    waitCompaction,
-                    isStreaming,
-                    memoryPool,
-                    metricGroup);
-        };
-    }
-
     public DataStreamSink<?> sinkFrom(DataStream<T> input) {
         // This commitUser is valid only for new jobs.
         // After the job starts, this commitUser will be recorded into the 
states of write and
@@ -223,9 +133,11 @@ public abstract class FlinkSink<T> implements Serializable 
{
                         (writeOnly ? WRITER_WRITE_ONLY_NAME : WRITER_NAME) + " 
: " + table.name(),
                         new CommittableTypeInfo(),
                         createWriteOperatorFactory(
-                                createWriteProvider(
+                                StoreSinkWrite.createWriteProvider(
+                                        table,
                                         env.getCheckpointConfig(),
                                         isStreaming,
+                                        ignorePreviousFiles,
                                         hasSinkMaterializer(input)),
                                 commitUser));
         if (parallelism == null) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index 0dad216ab0..3687fea6f1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -47,8 +47,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
-import static 
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
 import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
 
 /**
@@ -155,7 +153,8 @@ public class MultiTablesStoreCompactOperator
                         + " should not be true for 
MultiTablesStoreCompactOperator.");
 
         storeSinkWriteProvider =
-                createWriteProvider(table, checkpointConfig, isStreaming, 
ignorePreviousFiles);
+                StoreSinkWrite.createWriteProvider(
+                        table, checkpointConfig, isStreaming, 
ignorePreviousFiles, false);
 
         StoreSinkWrite write =
                 writes.computeIfAbsent(
@@ -253,77 +252,6 @@ public class MultiTablesStoreCompactOperator
         return table;
     }
 
-    private StoreSinkWrite.Provider createWriteProvider(
-            FileStoreTable fileStoreTable,
-            CheckpointConfig checkpointConfig,
-            boolean isStreaming,
-            boolean ignorePreviousFiles) {
-        Options options = fileStoreTable.coreOptions().toConfiguration();
-        CoreOptions.ChangelogProducer changelogProducer =
-                fileStoreTable.coreOptions().changelogProducer();
-        boolean waitCompaction;
-        CoreOptions coreOptions = fileStoreTable.coreOptions();
-        if (coreOptions.writeOnly()) {
-            waitCompaction = false;
-        } else {
-            waitCompaction = coreOptions.prepareCommitWaitCompaction();
-            int deltaCommits = -1;
-            if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
-                deltaCommits = options.get(FULL_COMPACTION_DELTA_COMMITS);
-            } else if 
(options.contains(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) {
-                long fullCompactionThresholdMs =
-                        
options.get(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL).toMillis();
-                deltaCommits =
-                        (int)
-                                (fullCompactionThresholdMs
-                                        / 
checkpointConfig.getCheckpointInterval());
-            }
-
-            if (changelogProducer == 
CoreOptions.ChangelogProducer.FULL_COMPACTION
-                    || deltaCommits >= 0) {
-                int finalDeltaCommits = Math.max(deltaCommits, 1);
-                return (table, commitUser, state, ioManager, memoryPool, 
metricGroup) ->
-                        new GlobalFullCompactionSinkWrite(
-                                table,
-                                commitUser,
-                                state,
-                                ioManager,
-                                ignorePreviousFiles,
-                                waitCompaction,
-                                finalDeltaCommits,
-                                isStreaming,
-                                memoryPool,
-                                metricGroup);
-            }
-        }
-
-        if (coreOptions.needLookup()) {
-            return (table, commitUser, state, ioManager, memoryPool, 
metricGroup) ->
-                    new LookupSinkWrite(
-                            table,
-                            commitUser,
-                            state,
-                            ioManager,
-                            ignorePreviousFiles,
-                            waitCompaction,
-                            isStreaming,
-                            memoryPool,
-                            metricGroup);
-        }
-
-        return (table, commitUser, state, ioManager, memoryPool, metricGroup) 
->
-                new StoreSinkWriteImpl(
-                        table,
-                        commitUser,
-                        state,
-                        ioManager,
-                        ignorePreviousFiles,
-                        waitCompaction,
-                        isStreaming,
-                        memoryPool,
-                        metricGroup);
-    }
-
     /** {@link StreamOperatorFactory} of {@link 
MultiTablesStoreCompactOperator}. */
     public static class Factory
             extends PrepareCommitOperator.Factory<RowData, 
MultiTableCommittable> {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
index 5bd2e9f9e1..d88cbea6f0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
@@ -18,18 +18,24 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.memory.MemoryPoolFactory;
 import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.operation.WriteRestore;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.SinkRecord;
 import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SerializableRunnable;
 
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 
 import javax.annotation.Nullable;
 
@@ -37,6 +43,9 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 
+import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
+
 /** Helper class of {@link PrepareCommitOperator} for different types of 
paimon sinks. */
 public interface StoreSinkWrite {
 
@@ -91,6 +100,95 @@ public interface StoreSinkWrite {
                 @Nullable MetricGroup metricGroup);
     }
 
+    static StoreSinkWrite.Provider createWriteProvider(
+            FileStoreTable fileStoreTable,
+            CheckpointConfig checkpointConfig,
+            boolean isStreaming,
+            boolean ignorePreviousFiles,
+            boolean hasSinkMaterializer) {
+        SerializableRunnable assertNoSinkMaterializer =
+                () ->
+                        Preconditions.checkArgument(
+                                !hasSinkMaterializer,
+                                String.format(
+                                        "Sink materializer must not be used 
with Paimon sink. "
+                                                + "Please set '%s' to '%s' in 
Flink's config.",
+                                        
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE
+                                                .key(),
+                                        
ExecutionConfigOptions.UpsertMaterialize.NONE.name()));
+
+        Options options = fileStoreTable.coreOptions().toConfiguration();
+        CoreOptions.ChangelogProducer changelogProducer =
+                fileStoreTable.coreOptions().changelogProducer();
+        boolean waitCompaction;
+        CoreOptions coreOptions = fileStoreTable.coreOptions();
+        if (coreOptions.writeOnly()) {
+            waitCompaction = false;
+        } else {
+            waitCompaction = coreOptions.prepareCommitWaitCompaction();
+            int deltaCommits = -1;
+            if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
+                deltaCommits = options.get(FULL_COMPACTION_DELTA_COMMITS);
+            } else if 
(options.contains(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) {
+                long fullCompactionThresholdMs =
+                        
options.get(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL).toMillis();
+                deltaCommits =
+                        (int)
+                                (fullCompactionThresholdMs
+                                        / 
checkpointConfig.getCheckpointInterval());
+            }
+
+            if (changelogProducer == 
CoreOptions.ChangelogProducer.FULL_COMPACTION
+                    || deltaCommits >= 0) {
+                int finalDeltaCommits = Math.max(deltaCommits, 1);
+                return (table, commitUser, state, ioManager, memoryPool, 
metricGroup) -> {
+                    assertNoSinkMaterializer.run();
+                    return new GlobalFullCompactionSinkWrite(
+                            table,
+                            commitUser,
+                            state,
+                            ioManager,
+                            ignorePreviousFiles,
+                            waitCompaction,
+                            finalDeltaCommits,
+                            isStreaming,
+                            memoryPool,
+                            metricGroup);
+                };
+            }
+
+            if (coreOptions.needLookup()) {
+                return (table, commitUser, state, ioManager, memoryPool, 
metricGroup) -> {
+                    assertNoSinkMaterializer.run();
+                    return new LookupSinkWrite(
+                            table,
+                            commitUser,
+                            state,
+                            ioManager,
+                            ignorePreviousFiles,
+                            waitCompaction,
+                            isStreaming,
+                            memoryPool,
+                            metricGroup);
+                };
+            }
+        }
+
+        return (table, commitUser, state, ioManager, memoryPool, metricGroup) 
-> {
+            assertNoSinkMaterializer.run();
+            return new StoreSinkWriteImpl(
+                    table,
+                    commitUser,
+                    state,
+                    ioManager,
+                    ignorePreviousFiles,
+                    waitCompaction,
+                    isStreaming,
+                    memoryPool,
+                    metricGroup);
+        };
+    }
+
     /** Provider of {@link StoreSinkWrite} that uses given write buffer. */
     @FunctionalInterface
     interface WithWriteBufferProvider extends Serializable {

Reply via email to