This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 801e18db4b [core][flink] Remove withMemoryPool in TableWriteImpl
(#6265)
801e18db4b is described below
commit 801e18db4bb3dfb0e9ba604f97306a6d21a92151
Author: tsreaper <[email protected]>
AuthorDate: Tue Sep 16 16:30:40 2025 +0800
[core][flink] Remove withMemoryPool in TableWriteImpl (#6265)
---
.../apache/paimon/operation/FileStoreWrite.java | 10 --
.../org/apache/paimon/table/sink/TableWrite.java | 7 +-
.../apache/paimon/table/sink/TableWriteImpl.java | 6 --
.../sink/cdc/CdcRecordStoreMultiWriteOperator.java | 24 +----
.../flink/sink/cdc/FlinkCdcMultiTableSink.java | 7 +-
.../sink/cdc/CdcRecordStoreWriteOperatorTest.java | 4 +-
.../flink/sink/GlobalFullCompactionSinkWrite.java | 6 +-
.../apache/paimon/flink/sink/LookupSinkWrite.java | 8 +-
.../sink/MultiTablesStoreCompactOperator.java | 2 +-
.../paimon/flink/sink/PrepareCommitOperator.java | 13 ++-
.../paimon/flink/sink/StoreCompactOperator.java | 2 +-
.../apache/paimon/flink/sink/StoreSinkWrite.java | 32 ++-----
.../paimon/flink/sink/StoreSinkWriteImpl.java | 77 +--------------
.../paimon/flink/sink/TableWriteOperator.java | 2 +-
.../paimon/flink/sink/CompactorSinkITCase.java | 106 ---------------------
.../flink/sink/StoreCompactOperatorTest.java | 2 +-
.../paimon/flink/sink/WriterOperatorTest.java | 4 +-
17 files changed, 51 insertions(+), 261 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
index b268f7b7a5..ce71b8cd2a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
@@ -25,7 +25,6 @@ import org.apache.paimon.disk.IOManager;
import org.apache.paimon.index.DynamicBucketIndexMaintainer;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.memory.MemoryPoolFactory;
-import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.SinkRecord;
@@ -58,15 +57,6 @@ public interface FileStoreWrite<T> extends
Restorable<List<FileStoreWrite.State<
throw new UnsupportedOperationException();
}
- /**
- * With memory pool for the current file store write.
- *
- * @param memoryPool the given memory pool.
- */
- default FileStoreWrite<T> withMemoryPool(MemorySegmentPool memoryPool) {
- return withMemoryPoolFactory(new MemoryPoolFactory(memoryPool));
- }
-
/**
* With memory pool factory for the current file store write.
*
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
index 73956c30e0..a36f314cd6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.BundleRecords;
+import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.table.Table;
@@ -43,7 +44,11 @@ public interface TableWrite extends AutoCloseable {
TableWrite withWriteType(RowType writeType);
/** With {@link MemorySegmentPool} for the current table write. */
- TableWrite withMemoryPool(MemorySegmentPool memoryPool);
+ default TableWrite withMemoryPool(MemorySegmentPool memoryPool) {
+ return withMemoryPoolFactory(new MemoryPoolFactory(memoryPool));
+ }
+
+ TableWrite withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory);
/** Calculate which partition {@code row} belongs to. */
BinaryRow getPartition(InternalRow row);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 99e30bcccf..325911b0ea 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -27,7 +27,6 @@ import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.BundleRecords;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.memory.MemoryPoolFactory;
-import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.BundleFileStoreWriter;
import org.apache.paimon.operation.FileStoreWrite;
@@ -120,11 +119,6 @@ public class TableWriteImpl<T> implements InnerTableWrite,
Restorable<List<State
}
@Override
- public TableWriteImpl<T> withMemoryPool(MemorySegmentPool memoryPool) {
- write.withMemoryPool(memoryPool);
- return this;
- }
-
public TableWriteImpl<T> withMemoryPoolFactory(MemoryPoolFactory
memoryPoolFactory) {
write.withMemoryPoolFactory(memoryPoolFactory);
return this;
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
index ea6008d872..da612bd337 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
@@ -31,8 +31,6 @@ import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.sink.StoreSinkWriteState;
import org.apache.paimon.flink.sink.StoreSinkWriteStateImpl;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
-import org.apache.paimon.memory.HeapMemorySegmentPool;
-import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
@@ -70,11 +68,10 @@ public class CdcRecordStoreMultiWriteOperator
private static final long serialVersionUID = 1L;
- private final StoreSinkWrite.WithWriteBufferProvider
storeSinkWriteProvider;
+ private final StoreSinkWrite.Provider storeSinkWriteProvider;
private final String initialCommitUser;
private final CatalogLoader catalogLoader;
- private MemoryPoolFactory memoryPoolFactory;
private Catalog catalog;
private Map<Identifier, FileStoreTable> tables;
private StoreSinkWriteState state;
@@ -85,7 +82,7 @@ public class CdcRecordStoreMultiWriteOperator
private CdcRecordStoreMultiWriteOperator(
StreamOperatorParameters<MultiTableCommittable> parameters,
CatalogLoader catalogLoader,
- StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider,
+ StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser,
Options options) {
super(parameters, options);
@@ -134,19 +131,6 @@ public class CdcRecordStoreMultiWriteOperator
int retryCnt =
table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES);
boolean skipCorruptRecord =
table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD);
- // all table write should share one write buffer so that writers can
preempt memory
- // from those of other tables
- if (memoryPoolFactory == null) {
- memoryPoolFactory =
- new MemoryPoolFactory(
- memoryPool != null
- ? memoryPool
- // currently, the options of all tables
are the same in CDC
- : new HeapMemorySegmentPool(
-
table.coreOptions().writeBufferSize(),
- table.coreOptions().pageSize()));
- }
-
StoreSinkWrite write =
writes.computeIfAbsent(
tableId,
@@ -294,13 +278,13 @@ public class CdcRecordStoreMultiWriteOperator
/** {@link StreamOperatorFactory} of {@link
CdcRecordStoreMultiWriteOperator}. */
public static class Factory
extends PrepareCommitOperator.Factory<CdcMultiplexRecord,
MultiTableCommittable> {
- private final StoreSinkWrite.WithWriteBufferProvider
storeSinkWriteProvider;
+ private final StoreSinkWrite.Provider storeSinkWriteProvider;
private final String initialCommitUser;
private final CatalogLoader catalogLoader;
public Factory(
CatalogLoader catalogLoader,
- StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider,
+ StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser,
Options options) {
super(options);
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index a67e74ef55..c9ae95b698 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -91,7 +91,7 @@ public class FlinkCdcMultiTableSink implements Serializable {
this.tableFilter = tableFilter;
}
- private StoreSinkWrite.WithWriteBufferProvider createWriteProvider() {
+ private StoreSinkWrite.Provider createWriteProvider() {
// for now, no compaction for multiplexed sink
return (table, commitUser, state, ioManager, memoryPoolFactory,
metricGroup) ->
new StoreSinkWriteImpl(
@@ -118,7 +118,7 @@ public class FlinkCdcMultiTableSink implements Serializable
{
public DataStreamSink<?> sinkFrom(
DataStream<CdcMultiplexRecord> input,
String commitUser,
- StoreSinkWrite.WithWriteBufferProvider sinkProvider) {
+ StoreSinkWrite.Provider sinkProvider) {
StreamExecutionEnvironment env = input.getExecutionEnvironment();
assertStreamingConfiguration(env);
MultiTableCommittableTypeInfo typeInfo = new
MultiTableCommittableTypeInfo();
@@ -151,8 +151,7 @@ public class FlinkCdcMultiTableSink implements Serializable
{
}
protected OneInputStreamOperatorFactory<CdcMultiplexRecord,
MultiTableCommittable>
- createWriteOperator(
- StoreSinkWrite.WithWriteBufferProvider writeProvider,
String commitUser) {
+ createWriteOperator(StoreSinkWrite.Provider writeProvider, String
commitUser) {
return new CdcRecordStoreMultiWriteOperator.Factory(
catalogLoader, writeProvider, commitUser, new Options());
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
index f00229d998..d27b919fa1 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
@@ -256,7 +256,7 @@ public class CdcRecordStoreWriteOperatorTest {
CdcRecordStoreWriteOperator.Factory operatorFactory =
new CdcRecordStoreWriteOperator.Factory(
table,
- (t, commitUser, state, ioManager, memoryPool,
metricGroup) ->
+ (t, commitUser, state, ioManager, memoryPoolFactory,
metricGroup) ->
new StoreSinkWriteImpl(
t,
commitUser,
@@ -265,7 +265,7 @@ public class CdcRecordStoreWriteOperatorTest {
false,
false,
true,
- memoryPool,
+ memoryPoolFactory,
metricGroup),
commitUser);
TypeSerializer<CdcRecord> inputSerializer = new JavaSerializer<>();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
index 62341a180d..a5702d7c6d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
@@ -21,7 +21,7 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.SinkRecord;
import org.apache.paimon.utils.SnapshotManager;
@@ -74,7 +74,7 @@ public class GlobalFullCompactionSinkWrite extends
StoreSinkWriteImpl {
boolean waitCompaction,
int deltaCommits,
boolean isStreaming,
- @Nullable MemorySegmentPool memoryPool,
+ MemoryPoolFactory memoryPoolFactory,
MetricGroup metricGroup) {
super(
table,
@@ -84,7 +84,7 @@ public class GlobalFullCompactionSinkWrite extends
StoreSinkWriteImpl {
ignorePreviousFiles,
waitCompaction,
isStreaming,
- memoryPool,
+ memoryPoolFactory,
metricGroup);
this.deltaCommits = deltaCommits;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LookupSinkWrite.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LookupSinkWrite.java
index e6f683d4b1..e8b27083b0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LookupSinkWrite.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LookupSinkWrite.java
@@ -19,15 +19,13 @@
package org.apache.paimon.flink.sink;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.table.FileStoreTable;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import javax.annotation.Nullable;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -48,7 +46,7 @@ public class LookupSinkWrite extends StoreSinkWriteImpl {
boolean ignorePreviousFiles,
boolean waitCompaction,
boolean isStreaming,
- @Nullable MemorySegmentPool memoryPool,
+ MemoryPoolFactory memoryPoolFactory,
MetricGroup metricGroup) {
super(
table,
@@ -58,7 +56,7 @@ public class LookupSinkWrite extends StoreSinkWriteImpl {
ignorePreviousFiles,
waitCompaction,
isStreaming,
- memoryPool,
+ memoryPoolFactory,
metricGroup);
this.tableName = table.name();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index 3687fea6f1..7d7ba29760 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -165,7 +165,7 @@ public class MultiTablesStoreCompactOperator
commitUser,
state,
getContainingTask().getEnvironment().getIOManager(),
- memoryPool,
+ memoryPoolFactory,
getMetricGroup()));
if (write.streamingMode()) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java
index e66819398e..35f5ff15b9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java
@@ -18,8 +18,11 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.memory.FlinkMemorySegmentPool;
import org.apache.paimon.flink.memory.MemorySegmentAllocator;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.options.Options;
@@ -51,7 +54,7 @@ public abstract class PrepareCommitOperator<IN, OUT> extends
AbstractStreamOpera
private static final long serialVersionUID = 1L;
- @Nullable protected transient MemorySegmentPool memoryPool;
+ protected transient MemoryPoolFactory memoryPoolFactory;
@Nullable private transient MemorySegmentAllocator memoryAllocator;
protected final Options options;
private boolean endOfInput = false;
@@ -67,6 +70,8 @@ public abstract class PrepareCommitOperator<IN, OUT> extends
AbstractStreamOpera
StreamConfig config,
Output<StreamRecord<OUT>> output) {
super.setup(containingTask, config, output);
+
+ MemorySegmentPool memoryPool;
if (options.get(SINK_USE_MANAGED_MEMORY)) {
MemoryManager memoryManager =
containingTask.getEnvironment().getMemoryManager();
memoryAllocator = new MemorySegmentAllocator(containingTask,
memoryManager);
@@ -75,7 +80,13 @@ public abstract class PrepareCommitOperator<IN, OUT> extends
AbstractStreamOpera
computeManagedMemory(this),
memoryManager.getPageSize(),
memoryAllocator);
+ } else {
+ CoreOptions coreOptions = new CoreOptions(options);
+ memoryPool =
+ new HeapMemorySegmentPool(
+ coreOptions.writeBufferSize(),
coreOptions.pageSize());
}
+ memoryPoolFactory = new MemoryPoolFactory(memoryPool);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 7676084b94..989c8e91dc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -117,7 +117,7 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
commitUser,
state,
getContainingTask().getEnvironment().getIOManager(),
- memoryPool,
+ memoryPoolFactory,
getMetricGroup());
this.writeRefresher = WriterRefresher.create(write.streamingMode(),
table, write::replace);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
index d88cbea6f0..a7b1434740 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
@@ -23,7 +23,6 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.memory.MemoryPoolFactory;
-import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.operation.WriteRestore;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
@@ -96,7 +95,7 @@ public interface StoreSinkWrite {
String commitUser,
StoreSinkWriteState state,
IOManager ioManager,
- @Nullable MemorySegmentPool memoryPool,
+ MemoryPoolFactory memoryPoolFactory,
@Nullable MetricGroup metricGroup);
}
@@ -141,7 +140,7 @@ public interface StoreSinkWrite {
if (changelogProducer ==
CoreOptions.ChangelogProducer.FULL_COMPACTION
|| deltaCommits >= 0) {
int finalDeltaCommits = Math.max(deltaCommits, 1);
- return (table, commitUser, state, ioManager, memoryPool,
metricGroup) -> {
+ return (table, commitUser, state, ioManager,
memoryPoolFactory, metricGroup) -> {
assertNoSinkMaterializer.run();
return new GlobalFullCompactionSinkWrite(
table,
@@ -152,13 +151,13 @@ public interface StoreSinkWrite {
waitCompaction,
finalDeltaCommits,
isStreaming,
- memoryPool,
+ memoryPoolFactory,
metricGroup);
};
}
if (coreOptions.needLookup()) {
- return (table, commitUser, state, ioManager, memoryPool,
metricGroup) -> {
+ return (table, commitUser, state, ioManager,
memoryPoolFactory, metricGroup) -> {
assertNoSinkMaterializer.run();
return new LookupSinkWrite(
table,
@@ -168,13 +167,13 @@ public interface StoreSinkWrite {
ignorePreviousFiles,
waitCompaction,
isStreaming,
- memoryPool,
+ memoryPoolFactory,
metricGroup);
};
}
}
- return (table, commitUser, state, ioManager, memoryPool, metricGroup)
-> {
+ return (table, commitUser, state, ioManager, memoryPoolFactory,
metricGroup) -> {
assertNoSinkMaterializer.run();
return new StoreSinkWriteImpl(
table,
@@ -184,25 +183,8 @@ public interface StoreSinkWrite {
ignorePreviousFiles,
waitCompaction,
isStreaming,
- memoryPool,
+ memoryPoolFactory,
metricGroup);
};
}
-
- /** Provider of {@link StoreSinkWrite} that uses given write buffer. */
- @FunctionalInterface
- interface WithWriteBufferProvider extends Serializable {
-
- /**
- * TODO: The argument list has become too complicated. Build {@link
TableWriteImpl} directly
- * in caller and simplify the argument list.
- */
- StoreSinkWrite provide(
- FileStoreTable table,
- String commitUser,
- StoreSinkWriteState state,
- IOManager ioManager,
- @Nullable MemoryPoolFactory memoryPoolFactory,
- MetricGroup metricGroup);
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 9c65f221aa..2d3742a95a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -24,9 +24,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryPoolFactory;
-import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.operation.FileStoreWrite;
import org.apache.paimon.operation.WriteRestore;
import org.apache.paimon.table.FileStoreTable;
@@ -46,8 +44,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
/** Default implementation of {@link StoreSinkWrite}. This writer does not
have states. */
public class StoreSinkWriteImpl implements StoreSinkWrite {
@@ -59,35 +55,10 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
private final boolean ignorePreviousFiles;
private final boolean waitCompaction;
private final boolean isStreamingMode;
- @Nullable private final MemorySegmentPool memoryPool;
- @Nullable private final MemoryPoolFactory memoryPoolFactory;
-
- protected TableWriteImpl<?> write;
-
+ private final MemoryPoolFactory memoryPoolFactory;
@Nullable private final MetricGroup metricGroup;
- public StoreSinkWriteImpl(
- FileStoreTable table,
- String commitUser,
- StoreSinkWriteState state,
- IOManager ioManager,
- boolean ignorePreviousFiles,
- boolean waitCompaction,
- boolean isStreamingMode,
- @Nullable MemorySegmentPool memoryPool,
- @Nullable MetricGroup metricGroup) {
- this(
- table,
- commitUser,
- state,
- ioManager,
- ignorePreviousFiles,
- waitCompaction,
- isStreamingMode,
- memoryPool,
- null,
- metricGroup);
- }
+ protected TableWriteImpl<?> write;
public StoreSinkWriteImpl(
FileStoreTable table,
@@ -99,67 +70,29 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
boolean isStreamingMode,
MemoryPoolFactory memoryPoolFactory,
@Nullable MetricGroup metricGroup) {
- this(
- table,
- commitUser,
- state,
- ioManager,
- ignorePreviousFiles,
- waitCompaction,
- isStreamingMode,
- null,
- memoryPoolFactory,
- metricGroup);
- }
-
- private StoreSinkWriteImpl(
- FileStoreTable table,
- String commitUser,
- StoreSinkWriteState state,
- IOManager ioManager,
- boolean ignorePreviousFiles,
- boolean waitCompaction,
- boolean isStreamingMode,
- @Nullable MemorySegmentPool memoryPool,
- @Nullable MemoryPoolFactory memoryPoolFactory,
- @Nullable MetricGroup metricGroup) {
this.commitUser = commitUser;
this.state = state;
this.paimonIOManager = new
IOManagerImpl(ioManager.getSpillingDirectoriesPaths());
this.ignorePreviousFiles = ignorePreviousFiles;
this.waitCompaction = waitCompaction;
this.isStreamingMode = isStreamingMode;
- this.memoryPool = memoryPool;
this.memoryPoolFactory = memoryPoolFactory;
this.metricGroup = metricGroup;
this.write = newTableWrite(table);
}
private TableWriteImpl<?> newTableWrite(FileStoreTable table) {
- checkArgument(
- !(memoryPool != null && memoryPoolFactory != null),
- "memoryPool and memoryPoolFactory cannot be set at the same
time.");
-
TableWriteImpl<?> tableWrite =
table.newWrite(commitUser, state.getSubtaskId())
.withIOManager(paimonIOManager)
.withIgnorePreviousFiles(ignorePreviousFiles)
- .withBucketMode(table.bucketMode());
+ .withBucketMode(table.bucketMode())
+ .withMemoryPoolFactory(memoryPoolFactory);
if (metricGroup != null) {
tableWrite.withMetricRegistry(new
FlinkMetricRegistry(metricGroup));
}
-
- if (memoryPoolFactory != null) {
- return tableWrite.withMemoryPoolFactory(memoryPoolFactory);
- } else {
- return tableWrite.withMemoryPool(
- memoryPool != null
- ? memoryPool
- : new HeapMemorySegmentPool(
- table.coreOptions().writeBufferSize(),
- table.coreOptions().pageSize()));
- }
+ return tableWrite;
}
public void withCompactExecutor(ExecutorService compactExecutor) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
index ca27059265..caf92cb742 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
@@ -94,7 +94,7 @@ public abstract class TableWriteOperator<IN> extends
PrepareCommitOperator<IN, C
getCommitUser(context),
state,
getContainingTask().getEnvironment().getIOManager(),
- memoryPool,
+ memoryPoolFactory,
getMetricGroup());
if (writeRestore != null) {
write.setWriteRestore(writeRestore);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
index 39664c3aa3..7b058328cb 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
@@ -19,11 +19,6 @@
package org.apache.paimon.flink.sink;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogLoader;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
@@ -31,7 +26,6 @@ import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
@@ -49,13 +43,8 @@ import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SnapshotManager;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -70,7 +59,6 @@ import java.util.Map;
import java.util.Random;
import java.util.UUID;
-import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
import static org.assertj.core.api.Assertions.assertThat;
/** IT cases for {@link CompactorSinkBuilder} and {@link CompactorSink}. */
@@ -216,98 +204,4 @@ public class CompactorSinkITCase extends AbstractTestBase {
""));
return FileStoreTableFactory.create(LocalFileIO.create(), tablePath,
tableSchema);
}
-
- private FileStoreTable createCatalogTable(Catalog catalog, Identifier
tableIdentifier)
- throws Exception {
- Schema tableSchema =
- new Schema(
- ROW_TYPE.getFields(),
- Collections.emptyList(),
- Collections.singletonList("k"),
- Collections.singletonMap("bucket", "1"),
- "");
- catalog.createTable(tableIdentifier, tableSchema, false);
- return (FileStoreTable) catalog.getTable(tableIdentifier);
- }
-
- private OneInputStreamOperatorTestHarness<RowData, Committable>
createTestHarness(
- OneInputStreamOperator<RowData, Committable> operator) throws
Exception {
- TypeSerializer<Committable> serializer =
- new CommittableTypeInfo().createSerializer(new
ExecutionConfig());
- OneInputStreamOperatorTestHarness<RowData, Committable> harness =
- new OneInputStreamOperatorTestHarness<>(operator);
- harness.setup(serializer);
- return harness;
- }
-
- private OneInputStreamOperatorTestHarness<RowData, MultiTableCommittable>
- createMultiTablesTestHarness(
- OneInputStreamOperator<RowData, MultiTableCommittable>
operator)
- throws Exception {
- TypeSerializer<MultiTableCommittable> serializer =
- new MultiTableCommittableTypeInfo().createSerializer(new
ExecutionConfig());
- OneInputStreamOperatorTestHarness<RowData, MultiTableCommittable>
harness =
- new OneInputStreamOperatorTestHarness<>(operator);
- harness.setup(serializer);
- return harness;
- }
-
- protected StoreCompactOperator.Factory
createCompactOperator(FileStoreTable table) {
- return new StoreCompactOperator.Factory(
- table,
- (t, commitUser, state, ioManager, memoryPool, metricGroup) ->
- new StoreSinkWriteImpl(
- t,
- commitUser,
- state,
- ioManager,
- false,
- false,
- false,
- memoryPool,
- metricGroup),
- "test",
- true);
- }
-
- protected MultiTablesStoreCompactOperator.Factory
createMultiTablesCompactOperator(
- CatalogLoader catalogLoader) throws Exception {
- return new MultiTablesStoreCompactOperator.Factory(
- catalogLoader,
- commitUser,
- new CheckpointConfig(),
- false,
- false,
- true,
- new Options());
- }
-
- private static byte[] partition(String dt, int hh) {
- BinaryRow row = new BinaryRow(2);
- BinaryRowWriter writer = new BinaryRowWriter(row);
- writer.writeString(0, BinaryString.fromString(dt));
- writer.writeInt(1, hh);
- writer.complete();
- return serializeBinaryRow(row);
- }
-
- private void prepareDataFile(FileStoreTable table) throws Exception {
- StreamWriteBuilder streamWriteBuilder =
- table.newStreamWriteBuilder().withCommitUser(commitUser);
- StreamTableWrite write = streamWriteBuilder.newWrite();
- StreamTableCommit commit = streamWriteBuilder.newCommit();
-
- write.write(rowData(1, 100, 15, BinaryString.fromString("20221208")));
- write.write(rowData(1, 100, 16, BinaryString.fromString("20221208")));
- write.write(rowData(1, 100, 15, BinaryString.fromString("20221209")));
- commit.commit(0, write.prepareCommit(true, 0));
-
- write.write(rowData(2, 200, 15, BinaryString.fromString("20221208")));
- write.write(rowData(2, 200, 16, BinaryString.fromString("20221208")));
- write.write(rowData(2, 200, 15, BinaryString.fromString("20221209")));
- commit.commit(1, write.prepareCommit(true, 1));
-
- write.close();
- commit.close();
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
index 0243270b6e..f90674c668 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
@@ -55,7 +55,7 @@ public class StoreCompactOperatorTest extends TableTestBase {
StoreCompactOperator.Factory operatorFactory =
new StoreCompactOperator.Factory(
getTableDefault(),
- (table, commitUser, state, ioManager, memoryPool,
metricGroup) ->
+ (table, commitUser, state, ioManager,
memoryPoolFactory, metricGroup) ->
compactRememberStoreWrite,
"10086",
!streamingMode);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
index 757d25de1c..109a3f984e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
@@ -550,7 +550,7 @@ public class WriterOperatorTest {
return new RowDataStoreWriteOperator.Factory(
fileStoreTable,
null,
- (table, commitUser, state, ioManager, memoryPool, metricGroup)
->
+ (table, commitUser, state, ioManager, memoryPoolFactory,
metricGroup) ->
new LookupSinkWrite(
table,
commitUser,
@@ -559,7 +559,7 @@ public class WriterOperatorTest {
false,
waitCompaction,
true,
- memoryPool,
+ memoryPoolFactory,
metricGroup),
commitUser);
}