This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.2
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit c2bdec5f6fd2d74eb4bc056a0558e98c7be5fb10
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 16 10:31:54 2025 +0800

    [core] Introduce auto-buffer-spill for postpone bucket write (#5744)
---
 .../java/org/apache/paimon/KeyValueFileStore.java  |   2 +
 .../org/apache/paimon/append/AppendOnlyWriter.java | 216 +++----------------
 .../paimon/operation/AbstractFileStoreWrite.java   |   2 +-
 .../postpone/PostponeBucketFileStoreWrite.java     |  71 ++++++-
 .../paimon/postpone/PostponeBucketWriter.java      | 141 +++++++++++--
 .../java/org/apache/paimon/utils/SinkWriter.java   | 234 +++++++++++++++++++++
 .../paimon/table/PrimaryKeySimpleTableTest.java    |  38 ++++
 7 files changed, 495 insertions(+), 209 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 81039da369..be4416af9b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -177,12 +177,14 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
         if (options.bucket() == BucketMode.POSTPONE_BUCKET) {
             return new PostponeBucketFileStoreWrite(
                     fileIO,
+                    pathFactory(),
                     schema,
                     commitUser,
                     partitionType,
                     keyType,
                     valueType,
                     this::pathFactory,
+                    newReaderFactoryBuilder(),
                     snapshotManager(),
                     
newScan(ScanType.FOR_WRITE).withManifestCacheFilter(manifestFilter),
                     options,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 694937c88f..3534ac4e9e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -23,7 +23,6 @@ import org.apache.paimon.compact.CompactDeletionFile;
 import org.apache.paimon.compact.CompactManager;
 import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.disk.RowBuffer;
 import org.apache.paimon.fileindex.FileIndexOptions;
@@ -45,14 +44,15 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BatchRecordWriter;
 import org.apache.paimon.utils.CommitIncrement;
 import org.apache.paimon.utils.IOFunction;
-import org.apache.paimon.utils.IOUtils;
 import org.apache.paimon.utils.LongCounter;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.RecordWriter;
+import org.apache.paimon.utils.SinkWriter;
+import org.apache.paimon.utils.SinkWriter.BufferedSinkWriter;
+import org.apache.paimon.utils.SinkWriter.DirectSinkWriter;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -72,7 +72,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
     private final RowType writeSchema;
     private final DataFilePathFactory pathFactory;
     private final CompactManager compactManager;
-    private final IOFunction<List<DataFileMeta>, 
RecordReaderIterator<InternalRow>> bucketFileRead;
+    private final IOFunction<List<DataFileMeta>, 
RecordReaderIterator<InternalRow>> dataFileRead;
     private final boolean forceCompact;
     private final boolean asyncFileWrite;
     private final boolean statsDenseStore;
@@ -80,17 +80,17 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
     private final List<DataFileMeta> deletedFiles;
     private final List<DataFileMeta> compactBefore;
     private final List<DataFileMeta> compactAfter;
-    @Nullable private CompactDeletionFile compactDeletionFile;
     private final LongCounter seqNumCounter;
     private final String fileCompression;
     private final CompressOptions spillCompression;
-    private SinkWriter sinkWriter;
     private final SimpleColStatsCollector.Factory[] statsCollectors;
     @Nullable private final IOManager ioManager;
     private final FileIndexOptions fileIndexOptions;
+    private final MemorySize maxDiskSize;
 
+    @Nullable private CompactDeletionFile compactDeletionFile;
+    private SinkWriter<InternalRow> sinkWriter;
     private MemorySegmentPool memorySegmentPool;
-    private final MemorySize maxDiskSize;
 
     public AppendOnlyWriter(
             FileIO fileIO,
@@ -101,7 +101,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
             RowType writeSchema,
             long maxSequenceNumber,
             CompactManager compactManager,
-            IOFunction<List<DataFileMeta>, RecordReaderIterator<InternalRow>> 
bucketFileRead,
+            IOFunction<List<DataFileMeta>, RecordReaderIterator<InternalRow>> 
dataFileRead,
             boolean forceCompact,
             DataFilePathFactory pathFactory,
             @Nullable CommitIncrement increment,
@@ -121,7 +121,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
         this.writeSchema = writeSchema;
         this.pathFactory = pathFactory;
         this.compactManager = compactManager;
-        this.bucketFileRead = bucketFileRead;
+        this.dataFileRead = dataFileRead;
         this.forceCompact = forceCompact;
         this.asyncFileWrite = asyncFileWrite;
         this.statsDenseStore = statsDenseStore;
@@ -139,8 +139,8 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
 
         this.sinkWriter =
                 useWriteBuffer
-                        ? new BufferedSinkWriter(spillable, maxDiskSize, 
spillCompression)
-                        : new DirectSinkWriter();
+                        ? createBufferedSinkWriter(spillable)
+                        : new DirectSinkWriter<>(this::createRollingRowWriter);
 
         if (increment != null) {
             newFiles.addAll(increment.newFilesIncrement().newFiles());
@@ -151,6 +151,18 @@ public class AppendOnlyWriter implements 
BatchRecordWriter, MemoryOwner {
         }
     }
 
+    private BufferedSinkWriter<InternalRow> createBufferedSinkWriter(boolean 
spillable) {
+        return new BufferedSinkWriter<>(
+                this::createRollingRowWriter,
+                t -> t,
+                t -> t,
+                ioManager,
+                writeSchema,
+                spillable,
+                maxDiskSize,
+                spillCompression);
+    }
+
     @Override
     public void write(InternalRow rowData) throws Exception {
         Preconditions.checkArgument(
@@ -178,7 +190,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
                 write(row);
             }
         } else {
-            ((DirectSinkWriter) sinkWriter).writeBundle(bundle);
+            ((DirectSinkWriter<?>) sinkWriter).writeBundle(bundle);
         }
     }
 
@@ -252,16 +264,16 @@ public class AppendOnlyWriter implements 
BatchRecordWriter, MemoryOwner {
     }
 
     public void toBufferedWriter() throws Exception {
-        if (sinkWriter != null && !sinkWriter.bufferSpillableWriter() && 
bucketFileRead != null) {
+        if (sinkWriter != null && !sinkWriter.bufferSpillableWriter() && 
dataFileRead != null) {
             // fetch the written results
             List<DataFileMeta> files = sinkWriter.flush();
 
             sinkWriter.close();
-            sinkWriter = new BufferedSinkWriter(true, maxDiskSize, 
spillCompression);
+            sinkWriter = createBufferedSinkWriter(true);
             sinkWriter.setMemoryPool(memorySegmentPool);
 
             // rewrite small files
-            try (RecordReaderIterator<InternalRow> reader = 
bucketFileRead.apply(files)) {
+            try (RecordReaderIterator<InternalRow> reader = 
dataFileRead.apply(files)) {
                 while (reader.hasNext()) {
                     sinkWriter.write(reader.next());
                 }
@@ -356,7 +368,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
     @VisibleForTesting
     public RowBuffer getWriteBuffer() {
         if (sinkWriter instanceof BufferedSinkWriter) {
-            return ((BufferedSinkWriter) sinkWriter).writeBuffer;
+            return ((BufferedSinkWriter<?>) sinkWriter).rowBuffer();
         } else {
             return null;
         }
@@ -366,176 +378,4 @@ public class AppendOnlyWriter implements 
BatchRecordWriter, MemoryOwner {
     List<DataFileMeta> getNewFiles() {
         return newFiles;
     }
-
-    /** Internal interface to Sink Data from input. */
-    private interface SinkWriter {
-
-        boolean write(InternalRow data) throws IOException;
-
-        List<DataFileMeta> flush() throws IOException;
-
-        boolean flushMemory() throws IOException;
-
-        long memoryOccupancy();
-
-        void close();
-
-        void setMemoryPool(MemorySegmentPool memoryPool);
-
-        boolean bufferSpillableWriter();
-    }
-
-    /**
-     * Directly sink data to file, no memory cache here, use 
OrcWriter/ParquetWrite/etc directly
-     * write data. May cause out-of-memory.
-     */
-    private class DirectSinkWriter implements SinkWriter {
-
-        private RowDataRollingFileWriter writer;
-
-        @Override
-        public boolean write(InternalRow data) throws IOException {
-            if (writer == null) {
-                writer = createRollingRowWriter();
-            }
-            writer.write(data);
-            return true;
-        }
-
-        public void writeBundle(BundleRecords bundle) throws IOException {
-            if (writer == null) {
-                writer = createRollingRowWriter();
-            }
-            writer.writeBundle(bundle);
-        }
-
-        @Override
-        public List<DataFileMeta> flush() throws IOException {
-            List<DataFileMeta> flushedFiles = new ArrayList<>();
-            if (writer != null) {
-                writer.close();
-                flushedFiles.addAll(writer.result());
-                writer = null;
-            }
-            return flushedFiles;
-        }
-
-        @Override
-        public boolean flushMemory() throws IOException {
-            return false;
-        }
-
-        @Override
-        public long memoryOccupancy() {
-            return 0;
-        }
-
-        @Override
-        public void close() {
-            if (writer != null) {
-                writer.abort();
-                writer = null;
-            }
-        }
-
-        @Override
-        public void setMemoryPool(MemorySegmentPool memoryPool) {
-            // do nothing
-        }
-
-        @Override
-        public boolean bufferSpillableWriter() {
-            return false;
-        }
-    }
-
-    /**
-     * Use buffered writer, segment pooled from segment pool. When spillable, 
may delay checkpoint
-     * acknowledge time. When non-spillable, may cause too many small files.
-     */
-    private class BufferedSinkWriter implements SinkWriter {
-
-        private final boolean spillable;
-
-        private final MemorySize maxDiskSize;
-
-        private final CompressOptions compression;
-
-        private RowBuffer writeBuffer;
-
-        private BufferedSinkWriter(
-                boolean spillable, MemorySize maxDiskSize, CompressOptions 
compression) {
-            this.spillable = spillable;
-            this.maxDiskSize = maxDiskSize;
-            this.compression = compression;
-        }
-
-        @Override
-        public boolean write(InternalRow data) throws IOException {
-            return writeBuffer.put(data);
-        }
-
-        @Override
-        public List<DataFileMeta> flush() throws IOException {
-            List<DataFileMeta> flushedFiles = new ArrayList<>();
-            if (writeBuffer != null) {
-                writeBuffer.complete();
-                RowDataRollingFileWriter writer = createRollingRowWriter();
-                IOException exception = null;
-                try (RowBuffer.RowBufferIterator iterator = 
writeBuffer.newIterator()) {
-                    while (iterator.advanceNext()) {
-                        writer.write(iterator.getRow());
-                    }
-                } catch (IOException e) {
-                    exception = e;
-                } finally {
-                    if (exception != null) {
-                        IOUtils.closeQuietly(writer);
-                        // cleanup code that might throw another exception
-                        throw exception;
-                    }
-                    writer.close();
-                }
-                flushedFiles.addAll(writer.result());
-                // reuse writeBuffer
-                writeBuffer.reset();
-            }
-            return flushedFiles;
-        }
-
-        @Override
-        public long memoryOccupancy() {
-            return writeBuffer.memoryOccupancy();
-        }
-
-        @Override
-        public void close() {
-            if (writeBuffer != null) {
-                writeBuffer.reset();
-                writeBuffer = null;
-            }
-        }
-
-        @Override
-        public void setMemoryPool(MemorySegmentPool memoryPool) {
-            writeBuffer =
-                    RowBuffer.getBuffer(
-                            ioManager,
-                            memoryPool,
-                            new InternalRowSerializer(writeSchema),
-                            spillable,
-                            maxDiskSize,
-                            compression);
-        }
-
-        @Override
-        public boolean bufferSpillableWriter() {
-            return spillable;
-        }
-
-        @Override
-        public boolean flushMemory() throws IOException {
-            return writeBuffer.flushMemory();
-        }
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index fad898da45..a7bac3f12a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -567,7 +567,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
     }
 
     @VisibleForTesting
-    Map<BinaryRow, Map<Integer, WriterContainer<T>>> writers() {
+    public Map<BinaryRow, Map<Integer, WriterContainer<T>>> writers() {
         return writers;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
index 09b3afdff2..97b5dfe7c2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
@@ -26,11 +26,15 @@ import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.avro.AvroSchemaConverter;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
-import org.apache.paimon.operation.AbstractFileStoreWrite;
+import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.FileStoreWrite;
+import org.apache.paimon.operation.MemoryFileStoreWrite;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.ReaderSupplier;
+import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.types.RowType;
@@ -39,10 +43,17 @@ import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SnapshotManager;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.BiFunction;
@@ -52,25 +63,37 @@ import static 
org.apache.paimon.format.FileFormat.fileFormat;
 import static 
org.apache.paimon.utils.FileStorePathFactory.createFormatPathFactories;
 
 /** {@link FileStoreWrite} for {@code bucket = -2} tables. */
-public class PostponeBucketFileStoreWrite extends 
AbstractFileStoreWrite<KeyValue> {
+public class PostponeBucketFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostponeBucketFileStoreWrite.class);
 
     private final CoreOptions options;
     private final KeyValueFileWriterFactory.Builder writerFactoryBuilder;
+    private final FileIO fileIO;
+    private final FileStorePathFactory pathFactory;
+    private final KeyValueFileReaderFactory.Builder readerFactoryBuilder;
+
+    private boolean forceBufferSpill = false;
 
     public PostponeBucketFileStoreWrite(
             FileIO fileIO,
+            FileStorePathFactory pathFactory,
             TableSchema schema,
             String commitUser,
             RowType partitionType,
             RowType keyType,
             RowType valueType,
             BiFunction<CoreOptions, String, FileStorePathFactory> 
formatPathFactory,
+            KeyValueFileReaderFactory.Builder readerFactoryBuilder,
             SnapshotManager snapshotManager,
             FileStoreScan scan,
             CoreOptions options,
             String tableName,
             @Nullable Integer writeId) {
-        super(snapshotManager, scan, null, null, tableName, options, 
partitionType);
+        super(snapshotManager, scan, options, partitionType, null, null, 
tableName);
+        this.fileIO = fileIO;
+        this.pathFactory = pathFactory;
+        this.readerFactoryBuilder = readerFactoryBuilder;
 
         Options newOptions = new Options(options.toMap());
         try {
@@ -120,6 +143,25 @@ public class PostponeBucketFileStoreWrite extends 
AbstractFileStoreWrite<KeyValu
         withIgnorePreviousFiles(true);
     }
 
+    @Override
+    protected void forceBufferSpill() throws Exception {
+        if (ioManager == null) {
+            return;
+        }
+        if (forceBufferSpill) {
+            return;
+        }
+        forceBufferSpill = true;
+        LOG.info(
+                "Force buffer spill for postpone file store write, writer 
number is: {}",
+                writers.size());
+        for (Map<Integer, WriterContainer<KeyValue>> bucketWriters : 
writers.values()) {
+            for (WriterContainer<KeyValue> writerContainer : 
bucketWriters.values()) {
+                ((PostponeBucketWriter) 
writerContainer.writer).toBufferedWriter();
+            }
+        }
+    }
+
     @Override
     public void withIgnorePreviousFiles(boolean ignorePrevious) {
         // see comments in constructor
@@ -141,7 +183,28 @@ public class PostponeBucketFileStoreWrite extends 
AbstractFileStoreWrite<KeyValu
                 "Postpone bucket writers should not restore previous files. 
This is unexpected.");
         KeyValueFileWriterFactory writerFactory =
                 writerFactoryBuilder.build(partition, bucket, options);
-        return new PostponeBucketWriter(writerFactory, restoreIncrement);
+        return new PostponeBucketWriter(
+                fileIO,
+                pathFactory.createDataFilePathFactory(partition, bucket),
+                options.spillCompressOptions(),
+                options.writeBufferSpillDiskSize(),
+                ioManager,
+                writerFactory,
+                files -> newFileRead(partition, bucket, files),
+                forceBufferSpill,
+                forceBufferSpill,
+                restoreIncrement);
+    }
+
+    private RecordReaderIterator<KeyValue> newFileRead(
+            BinaryRow partition, int bucket, List<DataFileMeta> files) throws 
IOException {
+        KeyValueFileReaderFactory readerFactory =
+                readerFactoryBuilder.build(partition, bucket, name -> 
Optional.empty());
+        List<ReaderSupplier<KeyValue>> suppliers = new ArrayList<>();
+        for (DataFileMeta file : files) {
+            suppliers.add(() -> readerFactory.createRecordReader(file));
+        }
+        return new 
RecordReaderIterator<>(ConcatRecordReader.create(suppliers));
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java
index 10259650d4..c41db8aac7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java
@@ -19,14 +19,29 @@
 package org.apache.paimon.postpone;
 
 import org.apache.paimon.KeyValue;
+import org.apache.paimon.KeyValueSerializer;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.compression.CompressOptions;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.io.DataIncrement;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
 import org.apache.paimon.io.RollingFileWriter;
 import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.memory.MemoryOwner;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CommitIncrement;
+import org.apache.paimon.utils.IOFunction;
 import org.apache.paimon.utils.RecordWriter;
+import org.apache.paimon.utils.SinkWriter;
+import org.apache.paimon.utils.SinkWriter.BufferedSinkWriter;
+import org.apache.paimon.utils.SinkWriter.DirectSinkWriter;
 
 import javax.annotation.Nullable;
 
@@ -36,30 +51,69 @@ import java.util.Collections;
 import java.util.List;
 
 /** {@link RecordWriter} for {@code bucket = -2} tables. */
-public class PostponeBucketWriter implements RecordWriter<KeyValue> {
+public class PostponeBucketWriter implements RecordWriter<KeyValue>, 
MemoryOwner {
 
+    private final FileIO fileIO;
+    private final DataFilePathFactory pathFactory;
     private final KeyValueFileWriterFactory writerFactory;
     private final List<DataFileMeta> files;
+    private final IOFunction<List<DataFileMeta>, 
RecordReaderIterator<KeyValue>> fileRead;
+    private final @Nullable IOManager ioManager;
+    private final CompressOptions spillCompression;
+    private final MemorySize maxDiskSize;
 
-    private RollingFileWriter<KeyValue, DataFileMeta> writer;
+    private SinkWriter<KeyValue> sinkWriter;
+    private MemorySegmentPool memorySegmentPool;
 
     public PostponeBucketWriter(
-            KeyValueFileWriterFactory writerFactory, @Nullable CommitIncrement 
restoreIncrement) {
+            FileIO fileIO,
+            DataFilePathFactory pathFactory,
+            CompressOptions spillCompression,
+            MemorySize maxDiskSize,
+            @Nullable IOManager ioManager,
+            KeyValueFileWriterFactory writerFactory,
+            IOFunction<List<DataFileMeta>, RecordReaderIterator<KeyValue>> 
fileRead,
+            boolean useWriteBuffer,
+            boolean spillable,
+            @Nullable CommitIncrement restoreIncrement) {
+        this.ioManager = ioManager;
         this.writerFactory = writerFactory;
+        this.fileRead = fileRead;
+        this.fileIO = fileIO;
+        this.pathFactory = pathFactory;
+        this.spillCompression = spillCompression;
+        this.maxDiskSize = maxDiskSize;
         this.files = new ArrayList<>();
         if (restoreIncrement != null) {
             files.addAll(restoreIncrement.newFilesIncrement().newFiles());
         }
+        this.sinkWriter =
+                useWriteBuffer
+                        ? createBufferedSinkWriter(spillable)
+                        : new DirectSinkWriter<>(this::createRollingRowWriter);
+    }
 
-        this.writer = null;
+    private RollingFileWriter<KeyValue, DataFileMeta> createRollingRowWriter() 
{
+        return writerFactory.createRollingMergeTreeFileWriter(0, 
FileSource.APPEND);
     }
 
     @Override
     public void write(KeyValue record) throws Exception {
-        if (writer == null) {
-            writer = writerFactory.createRollingMergeTreeFileWriter(0, 
FileSource.APPEND);
+        boolean success = sinkWriter.write(record);
+        if (!success) {
+            flush();
+            success = sinkWriter.write(record);
+            if (!success) {
+                // Should not get here, because writeBuffer will throw too big 
exception out.
+                // But we throw again in case of something unexpected happens. 
(like someone changed
+                // code in SpillableBuffer.)
+                throw new RuntimeException("Mem table is too small to hold a 
single element.");
+            }
         }
-        writer.write(record);
+    }
+
+    private void flush() throws Exception {
+        files.addAll(sinkWriter.flush());
     }
 
     @Override
@@ -82,13 +136,66 @@ public class PostponeBucketWriter implements 
RecordWriter<KeyValue> {
     }
 
     @Override
-    public CommitIncrement prepareCommit(boolean waitCompaction) throws 
Exception {
-        if (writer != null) {
-            writer.close();
-            files.addAll(writer.result());
-            writer = null;
+    public void setMemoryPool(MemorySegmentPool memoryPool) {
+        this.memorySegmentPool = memoryPool;
+        sinkWriter.setMemoryPool(memoryPool);
+    }
+
+    @Override
+    public long memoryOccupancy() {
+        return sinkWriter.memoryOccupancy();
+    }
+
+    @Override
+    public void flushMemory() throws Exception {
+        boolean success = sinkWriter.flushMemory();
+        if (!success) {
+            flush();
         }
+    }
 
+    private BufferedSinkWriter<KeyValue> createBufferedSinkWriter(boolean 
spillable) {
+        RowType keyType = writerFactory.keyType();
+        RowType valueType = writerFactory.valueType();
+        RowType kvRowType = KeyValue.schema(keyType, valueType);
+        KeyValueSerializer serializer = new KeyValueSerializer(keyType, 
valueType);
+        return new BufferedSinkWriter<>(
+                this::createRollingRowWriter,
+                serializer::toRow,
+                serializer::fromRow,
+                ioManager,
+                kvRowType,
+                spillable,
+                maxDiskSize,
+                spillCompression);
+    }
+
+    public void toBufferedWriter() throws Exception {
+        if (sinkWriter != null && !sinkWriter.bufferSpillableWriter() && 
fileRead != null) {
+            // fetch the written results
+            List<DataFileMeta> files = sinkWriter.flush();
+
+            sinkWriter.close();
+            sinkWriter = createBufferedSinkWriter(true);
+            sinkWriter.setMemoryPool(memorySegmentPool);
+
+            // rewrite small files
+            try (RecordReaderIterator<KeyValue> reader = 
fileRead.apply(files)) {
+                while (reader.hasNext()) {
+                    sinkWriter.write(reader.next());
+                }
+            } finally {
+                // remove small files
+                for (DataFileMeta file : files) {
+                    fileIO.deleteQuietly(pathFactory.toPath(file));
+                }
+            }
+        }
+    }
+
+    @Override
+    public CommitIncrement prepareCommit(boolean waitCompaction) throws 
Exception {
+        flush();
         List<DataFileMeta> result = new ArrayList<>(files);
         files.clear();
         return new CommitIncrement(
@@ -97,6 +204,11 @@ public class PostponeBucketWriter implements 
RecordWriter<KeyValue> {
                 null);
     }
 
+    @VisibleForTesting
+    public boolean useBufferedSinkWriter() {
+        return sinkWriter instanceof BufferedSinkWriter;
+    }
+
     @Override
     public boolean compactNotCompleted() {
         return false;
@@ -107,9 +219,6 @@ public class PostponeBucketWriter implements 
RecordWriter<KeyValue> {
 
     @Override
     public void close() throws Exception {
-        if (writer != null) {
-            writer.abort();
-            writer = null;
-        }
+        sinkWriter.close();
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SinkWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SinkWriter.java
new file mode 100644
index 0000000000..c4596a1df4
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SinkWriter.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.compression.CompressOptions;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.RowBuffer;
+import org.apache.paimon.io.BundleRecords;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.types.RowType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/** Internal interface to Sink Data from input. */
+public interface SinkWriter<T> {
+
+    boolean write(T data) throws IOException;
+
+    List<DataFileMeta> flush() throws IOException;
+
+    boolean flushMemory() throws IOException;
+
+    long memoryOccupancy();
+
+    void close();
+
+    void setMemoryPool(MemorySegmentPool memoryPool);
+
+    boolean bufferSpillableWriter();
+
+    /**
+     * Directly sink data to file, no memory cache here, use 
OrcWriter/ParquetWrite/etc directly
+     * write data. May cause out-of-memory.
+     */
+    class DirectSinkWriter<T> implements SinkWriter<T> {
+
+        private final Supplier<RollingFileWriter<T, DataFileMeta>> 
writerSupplier;
+
+        private RollingFileWriter<T, DataFileMeta> writer;
+
+        public DirectSinkWriter(Supplier<RollingFileWriter<T, DataFileMeta>> 
writerSupplier) {
+            this.writerSupplier = writerSupplier;
+        }
+
+        @Override
+        public boolean write(T data) throws IOException {
+            if (writer == null) {
+                writer = writerSupplier.get();
+            }
+            writer.write(data);
+            return true;
+        }
+
+        public void writeBundle(BundleRecords bundle) throws IOException {
+            if (writer == null) {
+                writer = writerSupplier.get();
+            }
+            writer.writeBundle(bundle);
+        }
+
+        @Override
+        public List<DataFileMeta> flush() throws IOException {
+            List<DataFileMeta> flushedFiles = new ArrayList<>();
+            if (writer != null) {
+                writer.close();
+                flushedFiles.addAll(writer.result());
+                writer = null;
+            }
+            return flushedFiles;
+        }
+
+        @Override
+        public boolean flushMemory() throws IOException {
+            return false;
+        }
+
+        @Override
+        public long memoryOccupancy() {
+            return 0;
+        }
+
+        @Override
+        public void close() {
+            if (writer != null) {
+                writer.abort();
+                writer = null;
+            }
+        }
+
+        @Override
+        public void setMemoryPool(MemorySegmentPool memoryPool) {
+            // do nothing
+        }
+
+        @Override
+        public boolean bufferSpillableWriter() {
+            return false;
+        }
+    }
+
+    /**
+     * Use buffered writer, segment pooled from segment pool. When spillable, 
may delay checkpoint
+     * acknowledge time. When non-spillable, may cause too many small files.
+     */
+    class BufferedSinkWriter<T> implements SinkWriter<T> {
+
+        private final Supplier<RollingFileWriter<T, DataFileMeta>> 
writerSupplier;
+        private final Function<T, InternalRow> toRow;
+        private final Function<InternalRow, T> fromRow;
+        private final IOManager ioManager;
+        private final RowType rowType;
+        private final boolean spillable;
+        private final MemorySize maxDiskSize;
+        private final CompressOptions compression;
+
+        private RowBuffer writeBuffer;
+
+        public BufferedSinkWriter(
+                Supplier<RollingFileWriter<T, DataFileMeta>> writerSupplier,
+                Function<T, InternalRow> toRow,
+                Function<InternalRow, T> fromRow,
+                IOManager ioManager,
+                RowType rowType,
+                boolean spillable,
+                MemorySize maxDiskSize,
+                CompressOptions compression) {
+            this.writerSupplier = writerSupplier;
+            this.toRow = toRow;
+            this.fromRow = fromRow;
+            this.ioManager = ioManager;
+            this.rowType = rowType;
+            this.spillable = spillable;
+            this.maxDiskSize = maxDiskSize;
+            this.compression = compression;
+        }
+
+        public RowBuffer rowBuffer() {
+            return writeBuffer;
+        }
+
+        @Override
+        public boolean write(T data) throws IOException {
+            return writeBuffer.put(toRow.apply(data));
+        }
+
+        @Override
+        public List<DataFileMeta> flush() throws IOException {
+            List<DataFileMeta> flushedFiles = new ArrayList<>();
+            if (writeBuffer != null) {
+                writeBuffer.complete();
+                RollingFileWriter<T, DataFileMeta> writer = 
writerSupplier.get();
+                IOException exception = null;
+                try (RowBuffer.RowBufferIterator iterator = 
writeBuffer.newIterator()) {
+                    while (iterator.advanceNext()) {
+                        writer.write(fromRow.apply(iterator.getRow()));
+                    }
+                } catch (IOException e) {
+                    exception = e;
+                } finally {
+                    if (exception != null) {
+                        IOUtils.closeQuietly(writer);
+                        // cleanup code that might throw another exception
+                        throw exception;
+                    }
+                    writer.close();
+                }
+                flushedFiles.addAll(writer.result());
+                // reuse writeBuffer
+                writeBuffer.reset();
+            }
+            return flushedFiles;
+        }
+
+        @Override
+        public long memoryOccupancy() {
+            return writeBuffer.memoryOccupancy();
+        }
+
+        @Override
+        public void close() {
+            if (writeBuffer != null) {
+                writeBuffer.reset();
+                writeBuffer = null;
+            }
+        }
+
+        @Override
+        public void setMemoryPool(MemorySegmentPool memoryPool) {
+            writeBuffer =
+                    RowBuffer.getBuffer(
+                            ioManager,
+                            memoryPool,
+                            new InternalRowSerializer(rowType),
+                            spillable,
+                            maxDiskSize,
+                            compression);
+        }
+
+        @Override
+        public boolean bufferSpillableWriter() {
+            return spillable;
+        }
+
+        @Override
+        public boolean flushMemory() throws IOException {
+            return writeBuffer.flushMemory();
+        }
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 0bf4971f94..35928e2335 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.ChangelogProducer;
 import org.apache.paimon.CoreOptions.LookupLocalFileType;
+import org.apache.paimon.KeyValue;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
@@ -34,9 +35,12 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.operation.AbstractFileStoreWrite;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.postpone.PostponeBucketFileStoreWrite;
+import org.apache.paimon.postpone.PostponeBucketWriter;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.reader.RecordReader;
@@ -54,6 +58,7 @@ import org.apache.paimon.table.sink.InnerTableCommit;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.table.sink.WriteSelector;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.InnerTableRead;
@@ -136,6 +141,39 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 /** Tests for {@link PrimaryKeyFileStoreTable}. */
 public class PrimaryKeySimpleTableTest extends SimpleTableTestBase {
 
+    @Test
+    public void testPostponeBucketWithManyPartitions() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(options -> options.set(BUCKET, 
BucketMode.POSTPONE_BUCKET));
+
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        try (BatchTableWrite write = writeBuilder.newWrite();
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            write.withIOManager(new IOManagerImpl(tempDir.toString()));
+            for (int i = 0; i < 100; i++) {
+                write.write(rowData(i, i, (long) i));
+            }
+
+            for (Map<Integer, 
AbstractFileStoreWrite.WriterContainer<KeyValue>> bucketWriters :
+                    ((PostponeBucketFileStoreWrite) ((TableWriteImpl<?>) 
write).getWrite())
+                            .writers()
+                            .values()) {
+                for (AbstractFileStoreWrite.WriterContainer<KeyValue> 
writerContainer :
+                        bucketWriters.values()) {
+                    PostponeBucketWriter writer = (PostponeBucketWriter) 
writerContainer.writer;
+                    assertThat(writer.useBufferedSinkWriter()).isTrue();
+                }
+            }
+            commit.commit(write.prepareCommit());
+        }
+
+        Snapshot snapshot = table.latestSnapshot().get();
+        ManifestFileMeta manifest =
+                
table.manifestListReader().read(snapshot.deltaManifestList()).get(0);
+        List<ManifestEntry> entries = 
table.manifestFileReader().read(manifest.fileName());
+        assertThat(entries.size()).isEqualTo(100);
+    }
+
     @Test
     public void testPostponeBucket() throws Exception {
         FileStoreTable table =


Reply via email to