This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 6ffe69651997c024e5d5e3122b19e602c854a74e Author: tsreaper <[email protected]> AuthorDate: Tue Sep 16 16:30:40 2025 +0800 [core][flink] Remove withMemoryPool in TableWriteImpl (#6265) --- .../apache/paimon/operation/FileStoreWrite.java | 10 -- .../org/apache/paimon/table/sink/TableWrite.java | 7 +- .../apache/paimon/table/sink/TableWriteImpl.java | 6 -- .../sink/cdc/CdcRecordStoreMultiWriteOperator.java | 24 +---- .../flink/sink/cdc/FlinkCdcMultiTableSink.java | 7 +- .../sink/cdc/CdcRecordStoreWriteOperatorTest.java | 4 +- .../flink/sink/GlobalFullCompactionSinkWrite.java | 6 +- .../apache/paimon/flink/sink/LookupSinkWrite.java | 8 +- .../sink/MultiTablesStoreCompactOperator.java | 2 +- .../paimon/flink/sink/PrepareCommitOperator.java | 13 ++- .../paimon/flink/sink/StoreCompactOperator.java | 2 +- .../apache/paimon/flink/sink/StoreSinkWrite.java | 32 ++----- .../paimon/flink/sink/StoreSinkWriteImpl.java | 77 +-------------- .../paimon/flink/sink/TableWriteOperator.java | 2 +- .../paimon/flink/sink/CompactorSinkITCase.java | 106 --------------------- .../flink/sink/StoreCompactOperatorTest.java | 2 +- .../paimon/flink/sink/WriterOperatorTest.java | 4 +- 17 files changed, 51 insertions(+), 261 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java index b268f7b7a5..ce71b8cd2a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java @@ -25,7 +25,6 @@ import org.apache.paimon.disk.IOManager; import org.apache.paimon.index.DynamicBucketIndexMaintainer; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.memory.MemoryPoolFactory; -import org.apache.paimon.memory.MemorySegmentPool; import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.SinkRecord; @@ -58,15 +57,6 @@ public interface FileStoreWrite<T> extends Restorable<List<FileStoreWrite.State< throw new UnsupportedOperationException(); } - /** - * With memory pool for the current file store write. - * - * @param memoryPool the given memory pool. - */ - default FileStoreWrite<T> withMemoryPool(MemorySegmentPool memoryPool) { - return withMemoryPoolFactory(new MemoryPoolFactory(memoryPool)); - } - /** * With memory pool factory for the current file store write. * diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java index 73956c30e0..a36f314cd6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; import org.apache.paimon.io.BundleRecords; +import org.apache.paimon.memory.MemoryPoolFactory; import org.apache.paimon.memory.MemorySegmentPool; import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.table.Table; @@ -43,7 +44,11 @@ public interface TableWrite extends AutoCloseable { TableWrite withWriteType(RowType writeType); /** With {@link MemorySegmentPool} for the current table write. */ - TableWrite withMemoryPool(MemorySegmentPool memoryPool); + default TableWrite withMemoryPool(MemorySegmentPool memoryPool) { + return withMemoryPoolFactory(new MemoryPoolFactory(memoryPool)); + } + + TableWrite withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory); /** Calculate which partition {@code row} belongs to. */ BinaryRow getPartition(InternalRow row); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java index 99e30bcccf..325911b0ea 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java @@ -27,7 +27,6 @@ import org.apache.paimon.disk.IOManager; import org.apache.paimon.io.BundleRecords; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.memory.MemoryPoolFactory; -import org.apache.paimon.memory.MemorySegmentPool; import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.operation.BundleFileStoreWriter; import org.apache.paimon.operation.FileStoreWrite; @@ -120,11 +119,6 @@ public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State } @Override - public TableWriteImpl<T> withMemoryPool(MemorySegmentPool memoryPool) { - write.withMemoryPool(memoryPool); - return this; - } - public TableWriteImpl<T> withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory) { write.withMemoryPoolFactory(memoryPoolFactory); return this; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java index ea6008d872..da612bd337 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java @@ -31,8 +31,6 @@ import org.apache.paimon.flink.sink.StoreSinkWriteImpl; import org.apache.paimon.flink.sink.StoreSinkWriteState; import org.apache.paimon.flink.sink.StoreSinkWriteStateImpl; import org.apache.paimon.flink.utils.RuntimeContextUtils; -import org.apache.paimon.memory.HeapMemorySegmentPool; -import org.apache.paimon.memory.MemoryPoolFactory; import org.apache.paimon.options.Options; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; @@ -70,11 +68,10 @@ public class CdcRecordStoreMultiWriteOperator private static final long serialVersionUID = 1L; - private final StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider; + private final StoreSinkWrite.Provider storeSinkWriteProvider; private final String initialCommitUser; private final CatalogLoader catalogLoader; - private MemoryPoolFactory memoryPoolFactory; private Catalog catalog; private Map<Identifier, FileStoreTable> tables; private StoreSinkWriteState state; @@ -85,7 +82,7 @@ public class CdcRecordStoreMultiWriteOperator private CdcRecordStoreMultiWriteOperator( StreamOperatorParameters<MultiTableCommittable> parameters, CatalogLoader catalogLoader, - StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider, + StoreSinkWrite.Provider storeSinkWriteProvider, String initialCommitUser, Options options) { super(parameters, options); @@ -134,19 +131,6 @@ public class CdcRecordStoreMultiWriteOperator int retryCnt = table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES); boolean skipCorruptRecord = table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD); - // all table write should share one write buffer so that writers can preempt memory - // from those of other tables - if (memoryPoolFactory == null) { - memoryPoolFactory = - new MemoryPoolFactory( - memoryPool != null - ? memoryPool - // currently, the options of all tables are the same in CDC - : new HeapMemorySegmentPool( - table.coreOptions().writeBufferSize(), - table.coreOptions().pageSize())); - } - StoreSinkWrite write = writes.computeIfAbsent( tableId, @@ -294,13 +278,13 @@ public class CdcRecordStoreMultiWriteOperator /** {@link StreamOperatorFactory} of {@link CdcRecordStoreMultiWriteOperator}. */ public static class Factory extends PrepareCommitOperator.Factory<CdcMultiplexRecord, MultiTableCommittable> { - private final StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider; + private final StoreSinkWrite.Provider storeSinkWriteProvider; private final String initialCommitUser; private final CatalogLoader catalogLoader; public Factory( CatalogLoader catalogLoader, - StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider, + StoreSinkWrite.Provider storeSinkWriteProvider, String initialCommitUser, Options options) { super(options); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java index a67e74ef55..c9ae95b698 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java @@ -91,7 +91,7 @@ public class FlinkCdcMultiTableSink implements Serializable { this.tableFilter = tableFilter; } - private StoreSinkWrite.WithWriteBufferProvider createWriteProvider() { + private StoreSinkWrite.Provider createWriteProvider() { // for now, no compaction for multiplexed sink return (table, commitUser, state, ioManager, memoryPoolFactory, metricGroup) -> new StoreSinkWriteImpl( @@ -118,7 +118,7 @@ public class FlinkCdcMultiTableSink implements Serializable { public DataStreamSink<?> sinkFrom( DataStream<CdcMultiplexRecord> input, String commitUser, - StoreSinkWrite.WithWriteBufferProvider sinkProvider) { + StoreSinkWrite.Provider sinkProvider) { StreamExecutionEnvironment env = input.getExecutionEnvironment(); assertStreamingConfiguration(env); MultiTableCommittableTypeInfo typeInfo = new MultiTableCommittableTypeInfo(); @@ -151,8 +151,7 @@ public class FlinkCdcMultiTableSink implements Serializable { } protected OneInputStreamOperatorFactory<CdcMultiplexRecord, MultiTableCommittable> - createWriteOperator( - StoreSinkWrite.WithWriteBufferProvider writeProvider, String commitUser) { + createWriteOperator(StoreSinkWrite.Provider writeProvider, String commitUser) { return new CdcRecordStoreMultiWriteOperator.Factory( catalogLoader, writeProvider, commitUser, new Options()); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java index f00229d998..d27b919fa1 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java @@ -256,7 +256,7 @@ public class CdcRecordStoreWriteOperatorTest { CdcRecordStoreWriteOperator.Factory operatorFactory = new CdcRecordStoreWriteOperator.Factory( table, - (t, commitUser, state, ioManager, memoryPool, metricGroup) -> + (t, commitUser, state, ioManager, memoryPoolFactory, metricGroup) -> new StoreSinkWriteImpl( t, commitUser, @@ -265,7 +265,7 @@ public class CdcRecordStoreWriteOperatorTest { false, false, true, - memoryPool, + memoryPoolFactory, metricGroup), commitUser); TypeSerializer<CdcRecord> inputSerializer = new JavaSerializer<>(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java index 62341a180d..a5702d7c6d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java @@ -21,7 +21,7 @@ package org.apache.paimon.flink.sink; import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.memory.MemorySegmentPool; +import org.apache.paimon.memory.MemoryPoolFactory; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.SinkRecord; import org.apache.paimon.utils.SnapshotManager; @@ -74,7 +74,7 @@ public class GlobalFullCompactionSinkWrite extends StoreSinkWriteImpl { boolean waitCompaction, int deltaCommits, boolean isStreaming, - @Nullable MemorySegmentPool memoryPool, + MemoryPoolFactory memoryPoolFactory, MetricGroup metricGroup) { super( table, @@ -84,7 +84,7 @@ public class GlobalFullCompactionSinkWrite extends StoreSinkWriteImpl { ignorePreviousFiles, waitCompaction, isStreaming, - memoryPool, + memoryPoolFactory, metricGroup); this.deltaCommits = deltaCommits; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LookupSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LookupSinkWrite.java index e6f683d4b1..e8b27083b0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LookupSinkWrite.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LookupSinkWrite.java @@ -19,15 +19,13 @@ package org.apache.paimon.flink.sink; import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.memory.MemorySegmentPool; +import org.apache.paimon.memory.MemoryPoolFactory; import org.apache.paimon.operation.AbstractFileStoreWrite; import org.apache.paimon.table.FileStoreTable; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import javax.annotation.Nullable; - import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -48,7 +46,7 @@ public class LookupSinkWrite extends StoreSinkWriteImpl { boolean ignorePreviousFiles, boolean waitCompaction, boolean isStreaming, - @Nullable MemorySegmentPool memoryPool, + MemoryPoolFactory memoryPoolFactory, MetricGroup metricGroup) { super( table, @@ -58,7 +56,7 @@ public class LookupSinkWrite extends StoreSinkWriteImpl { ignorePreviousFiles, waitCompaction, isStreaming, - memoryPool, + memoryPoolFactory, metricGroup); this.tableName = table.name(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java index 3687fea6f1..7d7ba29760 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java @@ -165,7 +165,7 @@ public class MultiTablesStoreCompactOperator commitUser, state, getContainingTask().getEnvironment().getIOManager(), - memoryPool, + memoryPoolFactory, getMetricGroup())); if (write.streamingMode()) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java index e66819398e..35f5ff15b9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java @@ -18,8 +18,11 @@ package org.apache.paimon.flink.sink; +import org.apache.paimon.CoreOptions; import org.apache.paimon.flink.memory.FlinkMemorySegmentPool; import org.apache.paimon.flink.memory.MemorySegmentAllocator; +import org.apache.paimon.memory.HeapMemorySegmentPool; +import org.apache.paimon.memory.MemoryPoolFactory; import org.apache.paimon.memory.MemorySegmentPool; import org.apache.paimon.options.Options; @@ -51,7 +54,7 @@ public abstract class PrepareCommitOperator<IN, OUT> extends AbstractStreamOpera private static final long serialVersionUID = 1L; - @Nullable protected transient MemorySegmentPool memoryPool; + protected transient MemoryPoolFactory memoryPoolFactory; @Nullable private transient MemorySegmentAllocator memoryAllocator; protected final Options options; private boolean endOfInput = false; @@ -67,6 +70,8 @@ public abstract class PrepareCommitOperator<IN, OUT> extends AbstractStreamOpera StreamConfig config, Output<StreamRecord<OUT>> output) { super.setup(containingTask, config, output); + + MemorySegmentPool memoryPool; if (options.get(SINK_USE_MANAGED_MEMORY)) { MemoryManager memoryManager = containingTask.getEnvironment().getMemoryManager(); memoryAllocator = new MemorySegmentAllocator(containingTask, memoryManager); @@ -75,7 +80,13 @@ public abstract class PrepareCommitOperator<IN, OUT> extends AbstractStreamOpera computeManagedMemory(this), memoryManager.getPageSize(), memoryAllocator); + } else { + CoreOptions coreOptions = new CoreOptions(options); + memoryPool = + new HeapMemorySegmentPool( + coreOptions.writeBufferSize(), coreOptions.pageSize()); } + memoryPoolFactory = new MemoryPoolFactory(memoryPool); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java index 7676084b94..989c8e91dc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java @@ -117,7 +117,7 @@ public class StoreCompactOperator extends PrepareCommitOperator<RowData, Committ commitUser, state, getContainingTask().getEnvironment().getIOManager(), - memoryPool, + memoryPoolFactory, getMetricGroup()); this.writeRefresher = WriterRefresher.create(write.streamingMode(), table, write::replace); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java index d88cbea6f0..a7b1434740 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java @@ -23,7 +23,6 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.memory.MemoryPoolFactory; -import org.apache.paimon.memory.MemorySegmentPool; import org.apache.paimon.operation.WriteRestore; import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; @@ -96,7 +95,7 @@ public interface StoreSinkWrite { String commitUser, StoreSinkWriteState state, IOManager ioManager, - @Nullable MemorySegmentPool memoryPool, + MemoryPoolFactory memoryPoolFactory, @Nullable MetricGroup metricGroup); } @@ -141,7 +140,7 @@ public interface StoreSinkWrite { if (changelogProducer == CoreOptions.ChangelogProducer.FULL_COMPACTION || deltaCommits >= 0) { int finalDeltaCommits = Math.max(deltaCommits, 1); - return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> { + return (table, commitUser, state, ioManager, memoryPoolFactory, metricGroup) -> { assertNoSinkMaterializer.run(); return new GlobalFullCompactionSinkWrite( table, @@ -152,13 +151,13 @@ public interface StoreSinkWrite { waitCompaction, finalDeltaCommits, isStreaming, - memoryPool, + memoryPoolFactory, metricGroup); }; } if (coreOptions.needLookup()) { - return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> { + return (table, commitUser, state, ioManager, memoryPoolFactory, metricGroup) -> { assertNoSinkMaterializer.run(); return new LookupSinkWrite( table, @@ -168,13 +167,13 @@ public interface StoreSinkWrite { ignorePreviousFiles, waitCompaction, isStreaming, - memoryPool, + memoryPoolFactory, metricGroup); }; } } - return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> { + return (table, commitUser, state, ioManager, memoryPoolFactory, metricGroup) -> { assertNoSinkMaterializer.run(); return new StoreSinkWriteImpl( table, @@ -184,25 +183,8 @@ public interface StoreSinkWrite { ignorePreviousFiles, waitCompaction, isStreaming, - memoryPool, + memoryPoolFactory, metricGroup); }; } - - /** Provider of {@link StoreSinkWrite} that uses given write buffer. */ - @FunctionalInterface - interface WithWriteBufferProvider extends Serializable { - - /** - * TODO: The argument list has become too complicated. Build {@link TableWriteImpl} directly - * in caller and simplify the argument list. - */ - StoreSinkWrite provide( - FileStoreTable table, - String commitUser, - StoreSinkWriteState state, - IOManager ioManager, - @Nullable MemoryPoolFactory memoryPoolFactory, - MetricGroup metricGroup); - } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java index 9c65f221aa..2d3742a95a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java @@ -24,9 +24,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManagerImpl; import org.apache.paimon.flink.metrics.FlinkMetricRegistry; import org.apache.paimon.io.DataFileMeta; -import org.apache.paimon.memory.HeapMemorySegmentPool; import org.apache.paimon.memory.MemoryPoolFactory; -import org.apache.paimon.memory.MemorySegmentPool; import org.apache.paimon.operation.FileStoreWrite; import org.apache.paimon.operation.WriteRestore; import org.apache.paimon.table.FileStoreTable; @@ -46,8 +44,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; -import static org.apache.paimon.utils.Preconditions.checkArgument; - /** Default implementation of {@link StoreSinkWrite}. This writer does not have states. */ public class StoreSinkWriteImpl implements StoreSinkWrite { @@ -59,35 +55,10 @@ public class StoreSinkWriteImpl implements StoreSinkWrite { private final boolean ignorePreviousFiles; private final boolean waitCompaction; private final boolean isStreamingMode; - @Nullable private final MemorySegmentPool memoryPool; - @Nullable private final MemoryPoolFactory memoryPoolFactory; - - protected TableWriteImpl<?> write; - + private final MemoryPoolFactory memoryPoolFactory; @Nullable private final MetricGroup metricGroup; - public StoreSinkWriteImpl( - FileStoreTable table, - String commitUser, - StoreSinkWriteState state, - IOManager ioManager, - boolean ignorePreviousFiles, - boolean waitCompaction, - boolean isStreamingMode, - @Nullable MemorySegmentPool memoryPool, - @Nullable MetricGroup metricGroup) { - this( - table, - commitUser, - state, - ioManager, - ignorePreviousFiles, - waitCompaction, - isStreamingMode, - memoryPool, - null, - metricGroup); - } + protected TableWriteImpl<?> write; public StoreSinkWriteImpl( FileStoreTable table, @@ -99,67 +70,29 @@ public class StoreSinkWriteImpl implements StoreSinkWrite { boolean isStreamingMode, MemoryPoolFactory memoryPoolFactory, @Nullable MetricGroup metricGroup) { - this( - table, - commitUser, - state, - ioManager, - ignorePreviousFiles, - waitCompaction, - isStreamingMode, - null, - memoryPoolFactory, - metricGroup); - } - - private StoreSinkWriteImpl( - FileStoreTable table, - String commitUser, - StoreSinkWriteState state, - IOManager ioManager, - boolean ignorePreviousFiles, - boolean waitCompaction, - boolean isStreamingMode, - @Nullable MemorySegmentPool memoryPool, - @Nullable MemoryPoolFactory memoryPoolFactory, - @Nullable MetricGroup metricGroup) { this.commitUser = commitUser; this.state = state; this.paimonIOManager = new IOManagerImpl(ioManager.getSpillingDirectoriesPaths()); this.ignorePreviousFiles = ignorePreviousFiles; this.waitCompaction = waitCompaction; this.isStreamingMode = isStreamingMode; - this.memoryPool = memoryPool; this.memoryPoolFactory = memoryPoolFactory; this.metricGroup = metricGroup; this.write = newTableWrite(table); } private TableWriteImpl<?> newTableWrite(FileStoreTable table) { - checkArgument( - !(memoryPool != null && memoryPoolFactory != null), - "memoryPool and memoryPoolFactory cannot be set at the same time."); - TableWriteImpl<?> tableWrite = table.newWrite(commitUser, state.getSubtaskId()) .withIOManager(paimonIOManager) .withIgnorePreviousFiles(ignorePreviousFiles) - .withBucketMode(table.bucketMode()); + .withBucketMode(table.bucketMode()) + .withMemoryPoolFactory(memoryPoolFactory); if (metricGroup != null) { tableWrite.withMetricRegistry(new FlinkMetricRegistry(metricGroup)); } - - if (memoryPoolFactory != null) { - return tableWrite.withMemoryPoolFactory(memoryPoolFactory); - } else { - return tableWrite.withMemoryPool( - memoryPool != null - ? memoryPool - : new HeapMemorySegmentPool( - table.coreOptions().writeBufferSize(), - table.coreOptions().pageSize())); - } + return tableWrite; } public void withCompactExecutor(ExecutorService compactExecutor) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java index ca27059265..caf92cb742 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java @@ -94,7 +94,7 @@ public abstract class TableWriteOperator<IN> extends PrepareCommitOperator<IN, C getCommitUser(context), state, getContainingTask().getEnvironment().getIOManager(), - memoryPool, + memoryPoolFactory, getMetricGroup()); if (writeRestore != null) { write.setWriteRestore(writeRestore); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java index 39664c3aa3..7b058328cb 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java @@ -19,11 +19,6 @@ package org.apache.paimon.flink.sink; import org.apache.paimon.Snapshot; -import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.CatalogLoader; -import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.data.BinaryRowWriter; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.flink.FlinkConnectorOptions; @@ -31,7 +26,6 @@ import org.apache.paimon.flink.source.CompactorSourceBuilder; import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; -import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; @@ -49,13 +43,8 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.SnapshotManager; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -70,7 +59,6 @@ import java.util.Map; import java.util.Random; import java.util.UUID; -import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow; import static org.assertj.core.api.Assertions.assertThat; /** IT cases for {@link CompactorSinkBuilder} and {@link CompactorSink}. */ @@ -216,98 +204,4 @@ public class CompactorSinkITCase extends AbstractTestBase { "")); return FileStoreTableFactory.create(LocalFileIO.create(), tablePath, tableSchema); } - - private FileStoreTable createCatalogTable(Catalog catalog, Identifier tableIdentifier) - throws Exception { - Schema tableSchema = - new Schema( - ROW_TYPE.getFields(), - Collections.emptyList(), - Collections.singletonList("k"), - Collections.singletonMap("bucket", "1"), - ""); - catalog.createTable(tableIdentifier, tableSchema, false); - return (FileStoreTable) catalog.getTable(tableIdentifier); - } - - private OneInputStreamOperatorTestHarness<RowData, Committable> createTestHarness( - OneInputStreamOperator<RowData, Committable> operator) throws Exception { - TypeSerializer<Committable> serializer = - new CommittableTypeInfo().createSerializer(new ExecutionConfig()); - OneInputStreamOperatorTestHarness<RowData, Committable> harness = - new OneInputStreamOperatorTestHarness<>(operator); - harness.setup(serializer); - return harness; - } - - private OneInputStreamOperatorTestHarness<RowData, MultiTableCommittable> - createMultiTablesTestHarness( - OneInputStreamOperator<RowData, MultiTableCommittable> operator) - throws Exception { - TypeSerializer<MultiTableCommittable> serializer = - new MultiTableCommittableTypeInfo().createSerializer(new ExecutionConfig()); - OneInputStreamOperatorTestHarness<RowData, MultiTableCommittable> harness = - new OneInputStreamOperatorTestHarness<>(operator); - harness.setup(serializer); - return harness; - } - - protected StoreCompactOperator.Factory createCompactOperator(FileStoreTable table) { - return new StoreCompactOperator.Factory( - table, - (t, commitUser, state, ioManager, memoryPool, metricGroup) -> - new StoreSinkWriteImpl( - t, - commitUser, - state, - ioManager, - false, - false, - false, - memoryPool, - metricGroup), - "test", - true); - } - - protected MultiTablesStoreCompactOperator.Factory createMultiTablesCompactOperator( - CatalogLoader catalogLoader) throws Exception { - return new MultiTablesStoreCompactOperator.Factory( - catalogLoader, - commitUser, - new CheckpointConfig(), - false, - false, - true, - new Options()); - } - - private static byte[] partition(String dt, int hh) { - BinaryRow row = new BinaryRow(2); - BinaryRowWriter writer = new BinaryRowWriter(row); - writer.writeString(0, BinaryString.fromString(dt)); - writer.writeInt(1, hh); - writer.complete(); - return serializeBinaryRow(row); - } - - private void prepareDataFile(FileStoreTable table) throws Exception { - StreamWriteBuilder streamWriteBuilder = - table.newStreamWriteBuilder().withCommitUser(commitUser); - StreamTableWrite write = streamWriteBuilder.newWrite(); - StreamTableCommit commit = streamWriteBuilder.newCommit(); - - write.write(rowData(1, 100, 15, BinaryString.fromString("20221208"))); - write.write(rowData(1, 100, 16, BinaryString.fromString("20221208"))); - write.write(rowData(1, 100, 15, BinaryString.fromString("20221209"))); - commit.commit(0, write.prepareCommit(true, 0)); - - write.write(rowData(2, 200, 15, BinaryString.fromString("20221208"))); - write.write(rowData(2, 200, 16, BinaryString.fromString("20221208"))); - write.write(rowData(2, 200, 15, BinaryString.fromString("20221209"))); - commit.commit(1, write.prepareCommit(true, 1)); - - write.close(); - commit.close(); - } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java index 0243270b6e..f90674c668 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java @@ -55,7 +55,7 @@ public class StoreCompactOperatorTest extends TableTestBase { StoreCompactOperator.Factory operatorFactory = new StoreCompactOperator.Factory( getTableDefault(), - (table, commitUser, state, ioManager, memoryPool, metricGroup) -> + (table, commitUser, state, ioManager, memoryPoolFactory, metricGroup) -> compactRememberStoreWrite, "10086", !streamingMode); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java index 757d25de1c..109a3f984e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java @@ -550,7 +550,7 @@ public class WriterOperatorTest { return new RowDataStoreWriteOperator.Factory( fileStoreTable, null, - (table, commitUser, state, ioManager, memoryPool, metricGroup) -> + (table, commitUser, state, ioManager, memoryPoolFactory, metricGroup) -> new LookupSinkWrite( table, commitUser, @@ -559,7 +559,7 @@ public class WriterOperatorTest { false, waitCompaction, true, - memoryPool, + memoryPoolFactory, metricGroup), commitUser); }
