This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 744ea92bfb [flink] Writer operator support SlotSharingGroup settings 
(#5698)
744ea92bfb is described below

commit 744ea92bfb848a6b968abb58ad506d4389becf86
Author: yuzelin <[email protected]>
AuthorDate: Sat Jun 7 09:58:11 2025 +0800

    [flink] Writer operator support SlotSharingGroup settings (#5698)
---
 .../generated/flink_connector_configuration.html          | 12 ++++++++++++
 .../paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java     | 11 +++++++++--
 .../flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java   |  6 ++++++
 .../paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java |  2 ++
 .../org/apache/paimon/flink/FlinkConnectorOptions.java    | 12 ++++++++++++
 .../main/java/org/apache/paimon/flink/sink/FlinkSink.java | 15 ++++++++++-----
 6 files changed, 51 insertions(+), 7 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index b73ffc9dff..8b80512fc4 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -284,6 +284,18 @@ under the License.
             <td>Boolean</td>
             <td>If true, flink sink will use managed memory for merge tree; 
otherwise, it will create an independent memory allocator.</td>
         </tr>
+        <tr>
+            <td><h5>sink.writer-cpu</h5></td>
+            <td style="word-wrap: break-word;">1.0</td>
+            <td>Double</td>
+            <td>Sink writer cpu to control cpu cores of writer.</td>
+        </tr>
+        <tr>
+            <td><h5>sink.writer-memory</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>MemorySize</td>
+            <td>Sink writer memory to control heap memory of writer.</td>
+        </tr>
         <tr>
             <td><h5>source.checkpoint-align.enabled</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index a825634a09..0cd2638179 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -50,7 +50,7 @@ import java.io.Serializable;
 import java.util.Collections;
 
 import static 
org.apache.paimon.flink.sink.FlinkSink.assertStreamingConfiguration;
-import static org.apache.paimon.flink.sink.FlinkSink.configureGlobalCommitter;
+import static org.apache.paimon.flink.sink.FlinkSink.configureSlotSharingGroup;
 import static 
org.apache.paimon.flink.utils.ParallelismUtils.forwardParallelism;
 
 /**
@@ -64,6 +64,8 @@ public class FlinkCdcMultiTableSink implements Serializable {
 
     private final boolean isOverwrite = false;
     private final CatalogLoader catalogLoader;
+    private final double writeCpuCores;
+    private final MemorySize writeHeapMemory;
     private final double commitCpuCores;
     @Nullable private final MemorySize commitHeapMemory;
     private final String commitUser;
@@ -72,12 +74,16 @@ public class FlinkCdcMultiTableSink implements Serializable 
{
 
     public FlinkCdcMultiTableSink(
             CatalogLoader catalogLoader,
+            double writeCpuCores,
+            @Nullable MemorySize writeHeapMemory,
             double commitCpuCores,
             @Nullable MemorySize commitHeapMemory,
             String commitUser,
             boolean eagerInit,
             TableFilter tableFilter) {
         this.catalogLoader = catalogLoader;
+        this.writeCpuCores = writeCpuCores;
+        this.writeHeapMemory = writeHeapMemory;
         this.commitCpuCores = commitCpuCores;
         this.commitHeapMemory = commitHeapMemory;
         this.commitUser = commitUser;
@@ -120,6 +126,7 @@ public class FlinkCdcMultiTableSink implements Serializable 
{
                 input.transform(
                         WRITER_NAME, typeInfo, 
createWriteOperator(sinkProvider, commitUser));
         forwardParallelism(written, input);
+        configureSlotSharingGroup(written, writeCpuCores, writeHeapMemory);
 
         // shuffle committables by table
         DataStream<MultiTableCommittable> partitioned =
@@ -139,7 +146,7 @@ public class FlinkCdcMultiTableSink implements Serializable 
{
                                 createCommitterFactory(tableFilter),
                                 createCommittableStateManager()));
         forwardParallelism(committed, input);
-        configureGlobalCommitter(committed, commitCpuCores, commitHeapMemory);
+        configureSlotSharingGroup(committed, commitCpuCores, commitHeapMemory);
         return committed.sinkTo(new 
DiscardingSink<>()).name("end").setParallelism(1);
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index b53ec9c80f..d5c33da412 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -67,6 +67,8 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
     private List<FileStoreTable> tables = new ArrayList<>();
 
     @Nullable private Integer parallelism;
+    private double writerCpu;
+    @Nullable private MemorySize writerMemory;
     private double committerCpu;
     @Nullable private MemorySize committerMemory;
 
@@ -107,6 +109,8 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
 
     public FlinkCdcSyncDatabaseSinkBuilder<T> withTableOptions(Options 
options) {
         this.parallelism = options.get(FlinkConnectorOptions.SINK_PARALLELISM);
+        this.writerCpu = options.get(FlinkConnectorOptions.SINK_WRITER_CPU);
+        this.writerMemory = 
options.get(FlinkConnectorOptions.SINK_WRITER_MEMORY);
         this.committerCpu = 
options.get(FlinkConnectorOptions.SINK_COMMITTER_CPU);
         this.committerMemory = 
options.get(FlinkConnectorOptions.SINK_COMMITTER_MEMORY);
         this.commitUser = createCommitUser(options);
@@ -190,6 +194,8 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
         FlinkCdcMultiTableSink sink =
                 new FlinkCdcMultiTableSink(
                         catalogLoader,
+                        writerCpu,
+                        writerMemory,
                         committerCpu,
                         committerMemory,
                         commitUser,
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java
index 2371fbfb9d..8ed54fac67 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java
@@ -50,6 +50,8 @@ public class FlinkCdcMultiTableSinkTest {
         FlinkCdcMultiTableSink sink =
                 new FlinkCdcMultiTableSink(
                         () -> FlinkCatalogFactory.createPaimonCatalog(new 
Options()),
+                        FlinkConnectorOptions.SINK_WRITER_CPU.defaultValue(),
+                        null,
                         
FlinkConnectorOptions.SINK_COMMITTER_CPU.defaultValue(),
                         null,
                         UUID.randomUUID().toString(),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index b39e9e57e3..9c0e9d6191 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -342,6 +342,18 @@ public class FlinkConnectorOptions {
                     .withDescription(
                             "If true, a tag will be automatically created for 
the snapshot created by flink savepoint.");
 
+    public static final ConfigOption<Double> SINK_WRITER_CPU =
+            ConfigOptions.key("sink.writer-cpu")
+                    .doubleType()
+                    .defaultValue(1.0)
+                    .withDescription("Sink writer cpu to control cpu cores of 
writer.");
+
+    public static final ConfigOption<MemorySize> SINK_WRITER_MEMORY =
+            ConfigOptions.key("sink.writer-memory")
+                    .memoryType()
+                    .noDefaultValue()
+                    .withDescription("Sink writer memory to control heap 
memory of writer.");
+
     public static final ConfigOption<Double> SINK_COMMITTER_CPU =
             ConfigOptions.key("sink.committer-cpu")
                     .doubleType()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index ccceaa1a54..f518f0c8d1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -67,6 +67,8 @@ import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_OPERA
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_OPERATOR_UID_SUFFIX;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_CPU;
+import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_MEMORY;
 import static org.apache.paimon.flink.FlinkConnectorOptions.generateCustomUid;
 import static 
org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory;
 import static 
org.apache.paimon.flink.utils.ParallelismUtils.forwardParallelism;
@@ -242,6 +244,9 @@ public abstract class FlinkSink<T> implements Serializable {
             declareManagedMemory(written, 
options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY));
         }
 
+        configureSlotSharingGroup(
+                written, options.get(SINK_WRITER_CPU), 
options.get(SINK_WRITER_MEMORY));
+
         if (!table.primaryKeys().isEmpty() && options.get(PRECOMMIT_COMPACT)) {
             SingleOutputStreamOperator<Committable> newWritten =
                     written.transform(
@@ -318,13 +323,13 @@ public abstract class FlinkSink<T> implements 
Serializable {
         if (!options.get(SINK_COMMITTER_OPERATOR_CHAINING)) {
             committed = committed.startNewChain();
         }
-        configureGlobalCommitter(
+        configureSlotSharingGroup(
                 committed, options.get(SINK_COMMITTER_CPU), 
options.get(SINK_COMMITTER_MEMORY));
         return committed.sinkTo(new 
DiscardingSink<>()).name("end").setParallelism(1);
     }
 
-    public static void configureGlobalCommitter(
-            SingleOutputStreamOperator<?> committed,
+    public static void configureSlotSharingGroup(
+            SingleOutputStreamOperator<?> operator,
             double cpuCores,
             @Nullable MemorySize heapMemory) {
         if (heapMemory == null) {
@@ -332,13 +337,13 @@ public abstract class FlinkSink<T> implements 
Serializable {
         }
 
         SlotSharingGroup slotSharingGroup =
-                SlotSharingGroup.newBuilder(committed.getName())
+                SlotSharingGroup.newBuilder(operator.getName())
                         .setCpuCores(cpuCores)
                         .setTaskHeapMemory(
                                 new org.apache.flink.configuration.MemorySize(
                                         heapMemory.getBytes()))
                         .build();
-        committed.slotSharingGroup(slotSharingGroup);
+        operator.slotSharingGroup(slotSharingGroup);
     }
 
     public static void assertStreamingConfiguration(StreamExecutionEnvironment 
env) {

Reply via email to