This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 801e18db4b [core][flink] Remove withMemoryPool in TableWriteImpl 
(#6265)
801e18db4b is described below

commit 801e18db4bb3dfb0e9ba604f97306a6d21a92151
Author: tsreaper <[email protected]>
AuthorDate: Tue Sep 16 16:30:40 2025 +0800

    [core][flink] Remove withMemoryPool in TableWriteImpl (#6265)
---
 .../apache/paimon/operation/FileStoreWrite.java    |  10 --
 .../org/apache/paimon/table/sink/TableWrite.java   |   7 +-
 .../apache/paimon/table/sink/TableWriteImpl.java   |   6 --
 .../sink/cdc/CdcRecordStoreMultiWriteOperator.java |  24 +----
 .../flink/sink/cdc/FlinkCdcMultiTableSink.java     |   7 +-
 .../sink/cdc/CdcRecordStoreWriteOperatorTest.java  |   4 +-
 .../flink/sink/GlobalFullCompactionSinkWrite.java  |   6 +-
 .../apache/paimon/flink/sink/LookupSinkWrite.java  |   8 +-
 .../sink/MultiTablesStoreCompactOperator.java      |   2 +-
 .../paimon/flink/sink/PrepareCommitOperator.java   |  13 ++-
 .../paimon/flink/sink/StoreCompactOperator.java    |   2 +-
 .../apache/paimon/flink/sink/StoreSinkWrite.java   |  32 ++-----
 .../paimon/flink/sink/StoreSinkWriteImpl.java      |  77 +--------------
 .../paimon/flink/sink/TableWriteOperator.java      |   2 +-
 .../paimon/flink/sink/CompactorSinkITCase.java     | 106 ---------------------
 .../flink/sink/StoreCompactOperatorTest.java       |   2 +-
 .../paimon/flink/sink/WriterOperatorTest.java      |   4 +-
 17 files changed, 51 insertions(+), 261 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
index b268f7b7a5..ce71b8cd2a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
@@ -25,7 +25,6 @@ import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.index.DynamicBucketIndexMaintainer;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.memory.MemoryPoolFactory;
-import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.SinkRecord;
@@ -58,15 +57,6 @@ public interface FileStoreWrite<T> extends 
Restorable<List<FileStoreWrite.State<
         throw new UnsupportedOperationException();
     }
 
-    /**
-     * With memory pool for the current file store write.
-     *
-     * @param memoryPool the given memory pool.
-     */
-    default FileStoreWrite<T> withMemoryPool(MemorySegmentPool memoryPool) {
-        return withMemoryPoolFactory(new MemoryPoolFactory(memoryPool));
-    }
-
     /**
      * With memory pool factory for the current file store write.
      *
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
index 73956c30e0..a36f314cd6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.io.BundleRecords;
+import org.apache.paimon.memory.MemoryPoolFactory;
 import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.table.Table;
@@ -43,7 +44,11 @@ public interface TableWrite extends AutoCloseable {
     TableWrite withWriteType(RowType writeType);
 
     /** With {@link MemorySegmentPool} for the current table write. */
-    TableWrite withMemoryPool(MemorySegmentPool memoryPool);
+    default TableWrite withMemoryPool(MemorySegmentPool memoryPool) {
+        return withMemoryPoolFactory(new MemoryPoolFactory(memoryPool));
+    }
+
+    TableWrite withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory);
 
     /** Calculate which partition {@code row} belongs to. */
     BinaryRow getPartition(InternalRow row);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 99e30bcccf..325911b0ea 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -27,7 +27,6 @@ import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.io.BundleRecords;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.memory.MemoryPoolFactory;
-import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.operation.BundleFileStoreWriter;
 import org.apache.paimon.operation.FileStoreWrite;
@@ -120,11 +119,6 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
     }
 
     @Override
-    public TableWriteImpl<T> withMemoryPool(MemorySegmentPool memoryPool) {
-        write.withMemoryPool(memoryPool);
-        return this;
-    }
-
     public TableWriteImpl<T> withMemoryPoolFactory(MemoryPoolFactory 
memoryPoolFactory) {
         write.withMemoryPoolFactory(memoryPoolFactory);
         return this;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
index ea6008d872..da612bd337 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
@@ -31,8 +31,6 @@ import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
 import org.apache.paimon.flink.sink.StoreSinkWriteState;
 import org.apache.paimon.flink.sink.StoreSinkWriteStateImpl;
 import org.apache.paimon.flink.utils.RuntimeContextUtils;
-import org.apache.paimon.memory.HeapMemorySegmentPool;
-import org.apache.paimon.memory.MemoryPoolFactory;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
@@ -70,11 +68,10 @@ public class CdcRecordStoreMultiWriteOperator
 
     private static final long serialVersionUID = 1L;
 
-    private final StoreSinkWrite.WithWriteBufferProvider 
storeSinkWriteProvider;
+    private final StoreSinkWrite.Provider storeSinkWriteProvider;
     private final String initialCommitUser;
     private final CatalogLoader catalogLoader;
 
-    private MemoryPoolFactory memoryPoolFactory;
     private Catalog catalog;
     private Map<Identifier, FileStoreTable> tables;
     private StoreSinkWriteState state;
@@ -85,7 +82,7 @@ public class CdcRecordStoreMultiWriteOperator
     private CdcRecordStoreMultiWriteOperator(
             StreamOperatorParameters<MultiTableCommittable> parameters,
             CatalogLoader catalogLoader,
-            StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider,
+            StoreSinkWrite.Provider storeSinkWriteProvider,
             String initialCommitUser,
             Options options) {
         super(parameters, options);
@@ -134,19 +131,6 @@ public class CdcRecordStoreMultiWriteOperator
         int retryCnt = 
table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES);
         boolean skipCorruptRecord = 
table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD);
 
-        // all table write should share one write buffer so that writers can 
preempt memory
-        // from those of other tables
-        if (memoryPoolFactory == null) {
-            memoryPoolFactory =
-                    new MemoryPoolFactory(
-                            memoryPool != null
-                                    ? memoryPool
-                                    // currently, the options of all tables 
are the same in CDC
-                                    : new HeapMemorySegmentPool(
-                                            
table.coreOptions().writeBufferSize(),
-                                            table.coreOptions().pageSize()));
-        }
-
         StoreSinkWrite write =
                 writes.computeIfAbsent(
                         tableId,
@@ -294,13 +278,13 @@ public class CdcRecordStoreMultiWriteOperator
     /** {@link StreamOperatorFactory} of {@link 
CdcRecordStoreMultiWriteOperator}. */
     public static class Factory
             extends PrepareCommitOperator.Factory<CdcMultiplexRecord, 
MultiTableCommittable> {
-        private final StoreSinkWrite.WithWriteBufferProvider 
storeSinkWriteProvider;
+        private final StoreSinkWrite.Provider storeSinkWriteProvider;
         private final String initialCommitUser;
         private final CatalogLoader catalogLoader;
 
         public Factory(
                 CatalogLoader catalogLoader,
-                StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider,
+                StoreSinkWrite.Provider storeSinkWriteProvider,
                 String initialCommitUser,
                 Options options) {
             super(options);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index a67e74ef55..c9ae95b698 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -91,7 +91,7 @@ public class FlinkCdcMultiTableSink implements Serializable {
         this.tableFilter = tableFilter;
     }
 
-    private StoreSinkWrite.WithWriteBufferProvider createWriteProvider() {
+    private StoreSinkWrite.Provider createWriteProvider() {
         // for now, no compaction for multiplexed sink
         return (table, commitUser, state, ioManager, memoryPoolFactory, 
metricGroup) ->
                 new StoreSinkWriteImpl(
@@ -118,7 +118,7 @@ public class FlinkCdcMultiTableSink implements Serializable 
{
     public DataStreamSink<?> sinkFrom(
             DataStream<CdcMultiplexRecord> input,
             String commitUser,
-            StoreSinkWrite.WithWriteBufferProvider sinkProvider) {
+            StoreSinkWrite.Provider sinkProvider) {
         StreamExecutionEnvironment env = input.getExecutionEnvironment();
         assertStreamingConfiguration(env);
         MultiTableCommittableTypeInfo typeInfo = new 
MultiTableCommittableTypeInfo();
@@ -151,8 +151,7 @@ public class FlinkCdcMultiTableSink implements Serializable 
{
     }
 
     protected OneInputStreamOperatorFactory<CdcMultiplexRecord, 
MultiTableCommittable>
-            createWriteOperator(
-                    StoreSinkWrite.WithWriteBufferProvider writeProvider, 
String commitUser) {
+            createWriteOperator(StoreSinkWrite.Provider writeProvider, String 
commitUser) {
         return new CdcRecordStoreMultiWriteOperator.Factory(
                 catalogLoader, writeProvider, commitUser, new Options());
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
index f00229d998..d27b919fa1 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
@@ -256,7 +256,7 @@ public class CdcRecordStoreWriteOperatorTest {
         CdcRecordStoreWriteOperator.Factory operatorFactory =
                 new CdcRecordStoreWriteOperator.Factory(
                         table,
-                        (t, commitUser, state, ioManager, memoryPool, 
metricGroup) ->
+                        (t, commitUser, state, ioManager, memoryPoolFactory, 
metricGroup) ->
                                 new StoreSinkWriteImpl(
                                         t,
                                         commitUser,
@@ -265,7 +265,7 @@ public class CdcRecordStoreWriteOperatorTest {
                                         false,
                                         false,
                                         true,
-                                        memoryPool,
+                                        memoryPoolFactory,
                                         metricGroup),
                         commitUser);
         TypeSerializer<CdcRecord> inputSerializer = new JavaSerializer<>();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
index 62341a180d..a5702d7c6d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
@@ -21,7 +21,7 @@ package org.apache.paimon.flink.sink;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.memory.MemoryPoolFactory;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.SinkRecord;
 import org.apache.paimon.utils.SnapshotManager;
@@ -74,7 +74,7 @@ public class GlobalFullCompactionSinkWrite extends 
StoreSinkWriteImpl {
             boolean waitCompaction,
             int deltaCommits,
             boolean isStreaming,
-            @Nullable MemorySegmentPool memoryPool,
+            MemoryPoolFactory memoryPoolFactory,
             MetricGroup metricGroup) {
         super(
                 table,
@@ -84,7 +84,7 @@ public class GlobalFullCompactionSinkWrite extends 
StoreSinkWriteImpl {
                 ignorePreviousFiles,
                 waitCompaction,
                 isStreaming,
-                memoryPool,
+                memoryPoolFactory,
                 metricGroup);
 
         this.deltaCommits = deltaCommits;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LookupSinkWrite.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LookupSinkWrite.java
index e6f683d4b1..e8b27083b0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LookupSinkWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LookupSinkWrite.java
@@ -19,15 +19,13 @@
 package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.memory.MemoryPoolFactory;
 import org.apache.paimon.operation.AbstractFileStoreWrite;
 import org.apache.paimon.table.FileStoreTable;
 
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 
-import javax.annotation.Nullable;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -48,7 +46,7 @@ public class LookupSinkWrite extends StoreSinkWriteImpl {
             boolean ignorePreviousFiles,
             boolean waitCompaction,
             boolean isStreaming,
-            @Nullable MemorySegmentPool memoryPool,
+            MemoryPoolFactory memoryPoolFactory,
             MetricGroup metricGroup) {
         super(
                 table,
@@ -58,7 +56,7 @@ public class LookupSinkWrite extends StoreSinkWriteImpl {
                 ignorePreviousFiles,
                 waitCompaction,
                 isStreaming,
-                memoryPool,
+                memoryPoolFactory,
                 metricGroup);
 
         this.tableName = table.name();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index 3687fea6f1..7d7ba29760 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -165,7 +165,7 @@ public class MultiTablesStoreCompactOperator
                                         commitUser,
                                         state,
                                         
getContainingTask().getEnvironment().getIOManager(),
-                                        memoryPool,
+                                        memoryPoolFactory,
                                         getMetricGroup()));
 
         if (write.streamingMode()) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java
index e66819398e..35f5ff15b9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java
@@ -18,8 +18,11 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.flink.memory.FlinkMemorySegmentPool;
 import org.apache.paimon.flink.memory.MemorySegmentAllocator;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.memory.MemoryPoolFactory;
 import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.options.Options;
 
@@ -51,7 +54,7 @@ public abstract class PrepareCommitOperator<IN, OUT> extends 
AbstractStreamOpera
 
     private static final long serialVersionUID = 1L;
 
-    @Nullable protected transient MemorySegmentPool memoryPool;
+    protected transient MemoryPoolFactory memoryPoolFactory;
     @Nullable private transient MemorySegmentAllocator memoryAllocator;
     protected final Options options;
     private boolean endOfInput = false;
@@ -67,6 +70,8 @@ public abstract class PrepareCommitOperator<IN, OUT> extends 
AbstractStreamOpera
             StreamConfig config,
             Output<StreamRecord<OUT>> output) {
         super.setup(containingTask, config, output);
+
+        MemorySegmentPool memoryPool;
         if (options.get(SINK_USE_MANAGED_MEMORY)) {
             MemoryManager memoryManager = 
containingTask.getEnvironment().getMemoryManager();
             memoryAllocator = new MemorySegmentAllocator(containingTask, 
memoryManager);
@@ -75,7 +80,13 @@ public abstract class PrepareCommitOperator<IN, OUT> extends 
AbstractStreamOpera
                             computeManagedMemory(this),
                             memoryManager.getPageSize(),
                             memoryAllocator);
+        } else {
+            CoreOptions coreOptions = new CoreOptions(options);
+            memoryPool =
+                    new HeapMemorySegmentPool(
+                            coreOptions.writeBufferSize(), 
coreOptions.pageSize());
         }
+        memoryPoolFactory = new MemoryPoolFactory(memoryPool);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 7676084b94..989c8e91dc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -117,7 +117,7 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
                         commitUser,
                         state,
                         getContainingTask().getEnvironment().getIOManager(),
-                        memoryPool,
+                        memoryPoolFactory,
                         getMetricGroup());
         this.writeRefresher = WriterRefresher.create(write.streamingMode(), 
table, write::replace);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
index d88cbea6f0..a7b1434740 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
@@ -23,7 +23,6 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.memory.MemoryPoolFactory;
-import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.operation.WriteRestore;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
@@ -96,7 +95,7 @@ public interface StoreSinkWrite {
                 String commitUser,
                 StoreSinkWriteState state,
                 IOManager ioManager,
-                @Nullable MemorySegmentPool memoryPool,
+                MemoryPoolFactory memoryPoolFactory,
                 @Nullable MetricGroup metricGroup);
     }
 
@@ -141,7 +140,7 @@ public interface StoreSinkWrite {
             if (changelogProducer == 
CoreOptions.ChangelogProducer.FULL_COMPACTION
                     || deltaCommits >= 0) {
                 int finalDeltaCommits = Math.max(deltaCommits, 1);
-                return (table, commitUser, state, ioManager, memoryPool, 
metricGroup) -> {
+                return (table, commitUser, state, ioManager, 
memoryPoolFactory, metricGroup) -> {
                     assertNoSinkMaterializer.run();
                     return new GlobalFullCompactionSinkWrite(
                             table,
@@ -152,13 +151,13 @@ public interface StoreSinkWrite {
                             waitCompaction,
                             finalDeltaCommits,
                             isStreaming,
-                            memoryPool,
+                            memoryPoolFactory,
                             metricGroup);
                 };
             }
 
             if (coreOptions.needLookup()) {
-                return (table, commitUser, state, ioManager, memoryPool, 
metricGroup) -> {
+                return (table, commitUser, state, ioManager, 
memoryPoolFactory, metricGroup) -> {
                     assertNoSinkMaterializer.run();
                     return new LookupSinkWrite(
                             table,
@@ -168,13 +167,13 @@ public interface StoreSinkWrite {
                             ignorePreviousFiles,
                             waitCompaction,
                             isStreaming,
-                            memoryPool,
+                            memoryPoolFactory,
                             metricGroup);
                 };
             }
         }
 
-        return (table, commitUser, state, ioManager, memoryPool, metricGroup) 
-> {
+        return (table, commitUser, state, ioManager, memoryPoolFactory, 
metricGroup) -> {
             assertNoSinkMaterializer.run();
             return new StoreSinkWriteImpl(
                     table,
@@ -184,25 +183,8 @@ public interface StoreSinkWrite {
                     ignorePreviousFiles,
                     waitCompaction,
                     isStreaming,
-                    memoryPool,
+                    memoryPoolFactory,
                     metricGroup);
         };
     }
-
-    /** Provider of {@link StoreSinkWrite} that uses given write buffer. */
-    @FunctionalInterface
-    interface WithWriteBufferProvider extends Serializable {
-
-        /**
-         * TODO: The argument list has become too complicated. Build {@link 
TableWriteImpl} directly
-         * in caller and simplify the argument list.
-         */
-        StoreSinkWrite provide(
-                FileStoreTable table,
-                String commitUser,
-                StoreSinkWriteState state,
-                IOManager ioManager,
-                @Nullable MemoryPoolFactory memoryPoolFactory,
-                MetricGroup metricGroup);
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 9c65f221aa..2d3742a95a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -24,9 +24,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManagerImpl;
 import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
 import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.memory.MemoryPoolFactory;
-import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.operation.FileStoreWrite;
 import org.apache.paimon.operation.WriteRestore;
 import org.apache.paimon.table.FileStoreTable;
@@ -46,8 +44,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
 /** Default implementation of {@link StoreSinkWrite}. This writer does not 
have states. */
 public class StoreSinkWriteImpl implements StoreSinkWrite {
 
@@ -59,35 +55,10 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
     private final boolean ignorePreviousFiles;
     private final boolean waitCompaction;
     private final boolean isStreamingMode;
-    @Nullable private final MemorySegmentPool memoryPool;
-    @Nullable private final MemoryPoolFactory memoryPoolFactory;
-
-    protected TableWriteImpl<?> write;
-
+    private final MemoryPoolFactory memoryPoolFactory;
     @Nullable private final MetricGroup metricGroup;
 
-    public StoreSinkWriteImpl(
-            FileStoreTable table,
-            String commitUser,
-            StoreSinkWriteState state,
-            IOManager ioManager,
-            boolean ignorePreviousFiles,
-            boolean waitCompaction,
-            boolean isStreamingMode,
-            @Nullable MemorySegmentPool memoryPool,
-            @Nullable MetricGroup metricGroup) {
-        this(
-                table,
-                commitUser,
-                state,
-                ioManager,
-                ignorePreviousFiles,
-                waitCompaction,
-                isStreamingMode,
-                memoryPool,
-                null,
-                metricGroup);
-    }
+    protected TableWriteImpl<?> write;
 
     public StoreSinkWriteImpl(
             FileStoreTable table,
@@ -99,67 +70,29 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
             boolean isStreamingMode,
             MemoryPoolFactory memoryPoolFactory,
             @Nullable MetricGroup metricGroup) {
-        this(
-                table,
-                commitUser,
-                state,
-                ioManager,
-                ignorePreviousFiles,
-                waitCompaction,
-                isStreamingMode,
-                null,
-                memoryPoolFactory,
-                metricGroup);
-    }
-
-    private StoreSinkWriteImpl(
-            FileStoreTable table,
-            String commitUser,
-            StoreSinkWriteState state,
-            IOManager ioManager,
-            boolean ignorePreviousFiles,
-            boolean waitCompaction,
-            boolean isStreamingMode,
-            @Nullable MemorySegmentPool memoryPool,
-            @Nullable MemoryPoolFactory memoryPoolFactory,
-            @Nullable MetricGroup metricGroup) {
         this.commitUser = commitUser;
         this.state = state;
         this.paimonIOManager = new 
IOManagerImpl(ioManager.getSpillingDirectoriesPaths());
         this.ignorePreviousFiles = ignorePreviousFiles;
         this.waitCompaction = waitCompaction;
         this.isStreamingMode = isStreamingMode;
-        this.memoryPool = memoryPool;
         this.memoryPoolFactory = memoryPoolFactory;
         this.metricGroup = metricGroup;
         this.write = newTableWrite(table);
     }
 
     private TableWriteImpl<?> newTableWrite(FileStoreTable table) {
-        checkArgument(
-                !(memoryPool != null && memoryPoolFactory != null),
-                "memoryPool and memoryPoolFactory cannot be set at the same 
time.");
-
         TableWriteImpl<?> tableWrite =
                 table.newWrite(commitUser, state.getSubtaskId())
                         .withIOManager(paimonIOManager)
                         .withIgnorePreviousFiles(ignorePreviousFiles)
-                        .withBucketMode(table.bucketMode());
+                        .withBucketMode(table.bucketMode())
+                        .withMemoryPoolFactory(memoryPoolFactory);
 
         if (metricGroup != null) {
             tableWrite.withMetricRegistry(new 
FlinkMetricRegistry(metricGroup));
         }
-
-        if (memoryPoolFactory != null) {
-            return tableWrite.withMemoryPoolFactory(memoryPoolFactory);
-        } else {
-            return tableWrite.withMemoryPool(
-                    memoryPool != null
-                            ? memoryPool
-                            : new HeapMemorySegmentPool(
-                                    table.coreOptions().writeBufferSize(),
-                                    table.coreOptions().pageSize()));
-        }
+        return tableWrite;
     }
 
     public void withCompactExecutor(ExecutorService compactExecutor) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
index ca27059265..caf92cb742 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
@@ -94,7 +94,7 @@ public abstract class TableWriteOperator<IN> extends 
PrepareCommitOperator<IN, C
                         getCommitUser(context),
                         state,
                         getContainingTask().getEnvironment().getIOManager(),
-                        memoryPool,
+                        memoryPoolFactory,
                         getMetricGroup());
         if (writeRestore != null) {
             write.setWriteRestore(writeRestore);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
index 39664c3aa3..7b058328cb 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
@@ -19,11 +19,6 @@
 package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogLoader;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.flink.FlinkConnectorOptions;
@@ -31,7 +26,6 @@ import org.apache.paimon.flink.source.CompactorSourceBuilder;
 import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
@@ -49,13 +43,8 @@ import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.SnapshotManager;
 
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.data.RowData;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -70,7 +59,6 @@ import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 
-import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** IT cases for {@link CompactorSinkBuilder} and {@link CompactorSink}. */
@@ -216,98 +204,4 @@ public class CompactorSinkITCase extends AbstractTestBase {
                                 ""));
         return FileStoreTableFactory.create(LocalFileIO.create(), tablePath, 
tableSchema);
     }
-
-    private FileStoreTable createCatalogTable(Catalog catalog, Identifier 
tableIdentifier)
-            throws Exception {
-        Schema tableSchema =
-                new Schema(
-                        ROW_TYPE.getFields(),
-                        Collections.emptyList(),
-                        Collections.singletonList("k"),
-                        Collections.singletonMap("bucket", "1"),
-                        "");
-        catalog.createTable(tableIdentifier, tableSchema, false);
-        return (FileStoreTable) catalog.getTable(tableIdentifier);
-    }
-
-    private OneInputStreamOperatorTestHarness<RowData, Committable> 
createTestHarness(
-            OneInputStreamOperator<RowData, Committable> operator) throws 
Exception {
-        TypeSerializer<Committable> serializer =
-                new CommittableTypeInfo().createSerializer(new 
ExecutionConfig());
-        OneInputStreamOperatorTestHarness<RowData, Committable> harness =
-                new OneInputStreamOperatorTestHarness<>(operator);
-        harness.setup(serializer);
-        return harness;
-    }
-
-    private OneInputStreamOperatorTestHarness<RowData, MultiTableCommittable>
-            createMultiTablesTestHarness(
-                    OneInputStreamOperator<RowData, MultiTableCommittable> 
operator)
-                    throws Exception {
-        TypeSerializer<MultiTableCommittable> serializer =
-                new MultiTableCommittableTypeInfo().createSerializer(new 
ExecutionConfig());
-        OneInputStreamOperatorTestHarness<RowData, MultiTableCommittable> 
harness =
-                new OneInputStreamOperatorTestHarness<>(operator);
-        harness.setup(serializer);
-        return harness;
-    }
-
-    protected StoreCompactOperator.Factory 
createCompactOperator(FileStoreTable table) {
-        return new StoreCompactOperator.Factory(
-                table,
-                (t, commitUser, state, ioManager, memoryPool, metricGroup) ->
-                        new StoreSinkWriteImpl(
-                                t,
-                                commitUser,
-                                state,
-                                ioManager,
-                                false,
-                                false,
-                                false,
-                                memoryPool,
-                                metricGroup),
-                "test",
-                true);
-    }
-
-    protected MultiTablesStoreCompactOperator.Factory 
createMultiTablesCompactOperator(
-            CatalogLoader catalogLoader) throws Exception {
-        return new MultiTablesStoreCompactOperator.Factory(
-                catalogLoader,
-                commitUser,
-                new CheckpointConfig(),
-                false,
-                false,
-                true,
-                new Options());
-    }
-
-    private static byte[] partition(String dt, int hh) {
-        BinaryRow row = new BinaryRow(2);
-        BinaryRowWriter writer = new BinaryRowWriter(row);
-        writer.writeString(0, BinaryString.fromString(dt));
-        writer.writeInt(1, hh);
-        writer.complete();
-        return serializeBinaryRow(row);
-    }
-
-    private void prepareDataFile(FileStoreTable table) throws Exception {
-        StreamWriteBuilder streamWriteBuilder =
-                table.newStreamWriteBuilder().withCommitUser(commitUser);
-        StreamTableWrite write = streamWriteBuilder.newWrite();
-        StreamTableCommit commit = streamWriteBuilder.newCommit();
-
-        write.write(rowData(1, 100, 15, BinaryString.fromString("20221208")));
-        write.write(rowData(1, 100, 16, BinaryString.fromString("20221208")));
-        write.write(rowData(1, 100, 15, BinaryString.fromString("20221209")));
-        commit.commit(0, write.prepareCommit(true, 0));
-
-        write.write(rowData(2, 200, 15, BinaryString.fromString("20221208")));
-        write.write(rowData(2, 200, 16, BinaryString.fromString("20221208")));
-        write.write(rowData(2, 200, 15, BinaryString.fromString("20221209")));
-        commit.commit(1, write.prepareCommit(true, 1));
-
-        write.close();
-        commit.close();
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
index 0243270b6e..f90674c668 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
@@ -55,7 +55,7 @@ public class StoreCompactOperatorTest extends TableTestBase {
         StoreCompactOperator.Factory operatorFactory =
                 new StoreCompactOperator.Factory(
                         getTableDefault(),
-                        (table, commitUser, state, ioManager, memoryPool, 
metricGroup) ->
+                        (table, commitUser, state, ioManager, 
memoryPoolFactory, metricGroup) ->
                                 compactRememberStoreWrite,
                         "10086",
                         !streamingMode);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
index 757d25de1c..109a3f984e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
@@ -550,7 +550,7 @@ public class WriterOperatorTest {
         return new RowDataStoreWriteOperator.Factory(
                 fileStoreTable,
                 null,
-                (table, commitUser, state, ioManager, memoryPool, metricGroup) 
->
+                (table, commitUser, state, ioManager, memoryPoolFactory, 
metricGroup) ->
                         new LookupSinkWrite(
                                 table,
                                 commitUser,
@@ -559,7 +559,7 @@ public class WriterOperatorTest {
                                 false,
                                 waitCompaction,
                                 true,
-                                memoryPool,
+                                memoryPoolFactory,
                                 metricGroup),
                 commitUser);
     }


Reply via email to