This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.2 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit c2bdec5f6fd2d74eb4bc056a0558e98c7be5fb10 Author: Jingsong Lee <[email protected]> AuthorDate: Mon Jun 16 10:31:54 2025 +0800 [core] Introduce auto-buffer-spill for postpone bucket write (#5744) --- .../java/org/apache/paimon/KeyValueFileStore.java | 2 + .../org/apache/paimon/append/AppendOnlyWriter.java | 216 +++---------------- .../paimon/operation/AbstractFileStoreWrite.java | 2 +- .../postpone/PostponeBucketFileStoreWrite.java | 71 ++++++- .../paimon/postpone/PostponeBucketWriter.java | 141 +++++++++++-- .../java/org/apache/paimon/utils/SinkWriter.java | 234 +++++++++++++++++++++ .../paimon/table/PrimaryKeySimpleTableTest.java | 38 ++++ 7 files changed, 495 insertions(+), 209 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index 81039da369..be4416af9b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -177,12 +177,14 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> { if (options.bucket() == BucketMode.POSTPONE_BUCKET) { return new PostponeBucketFileStoreWrite( fileIO, + pathFactory(), schema, commitUser, partitionType, keyType, valueType, this::pathFactory, + newReaderFactoryBuilder(), snapshotManager(), newScan(ScanType.FOR_WRITE).withManifestCacheFilter(manifestFilter), options, diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 694937c88f..3534ac4e9e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -23,7 +23,6 @@ import org.apache.paimon.compact.CompactDeletionFile; import org.apache.paimon.compact.CompactManager; import org.apache.paimon.compression.CompressOptions; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.disk.IOManager; import org.apache.paimon.disk.RowBuffer; import org.apache.paimon.fileindex.FileIndexOptions; @@ -45,14 +44,15 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BatchRecordWriter; import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.IOFunction; -import org.apache.paimon.utils.IOUtils; import org.apache.paimon.utils.LongCounter; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.RecordWriter; +import org.apache.paimon.utils.SinkWriter; +import org.apache.paimon.utils.SinkWriter.BufferedSinkWriter; +import org.apache.paimon.utils.SinkWriter.DirectSinkWriter; import javax.annotation.Nullable; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -72,7 +72,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner { private final RowType writeSchema; private final DataFilePathFactory pathFactory; private final CompactManager compactManager; - private final IOFunction<List<DataFileMeta>, RecordReaderIterator<InternalRow>> bucketFileRead; + private final IOFunction<List<DataFileMeta>, RecordReaderIterator<InternalRow>> dataFileRead; private final boolean forceCompact; private final boolean asyncFileWrite; private final boolean statsDenseStore; @@ -80,17 +80,17 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner { private final List<DataFileMeta> deletedFiles; private final List<DataFileMeta> compactBefore; private final List<DataFileMeta> compactAfter; - @Nullable private CompactDeletionFile compactDeletionFile; private final LongCounter seqNumCounter; private final String fileCompression; private final CompressOptions spillCompression; - private SinkWriter sinkWriter; private final SimpleColStatsCollector.Factory[] statsCollectors; @Nullable private final IOManager ioManager; private final FileIndexOptions fileIndexOptions; + private final MemorySize maxDiskSize; + @Nullable private CompactDeletionFile compactDeletionFile; + private SinkWriter<InternalRow> sinkWriter; private MemorySegmentPool memorySegmentPool; - private final MemorySize maxDiskSize; public AppendOnlyWriter( FileIO fileIO, @@ -101,7 +101,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner { RowType writeSchema, long maxSequenceNumber, CompactManager compactManager, - IOFunction<List<DataFileMeta>, RecordReaderIterator<InternalRow>> bucketFileRead, + IOFunction<List<DataFileMeta>, RecordReaderIterator<InternalRow>> dataFileRead, boolean forceCompact, DataFilePathFactory pathFactory, @Nullable CommitIncrement increment, @@ -121,7 +121,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner { this.writeSchema = writeSchema; this.pathFactory = pathFactory; this.compactManager = compactManager; - this.bucketFileRead = bucketFileRead; + this.dataFileRead = dataFileRead; this.forceCompact = forceCompact; this.asyncFileWrite = asyncFileWrite; this.statsDenseStore = statsDenseStore; @@ -139,8 +139,8 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner { this.sinkWriter = useWriteBuffer - ? new BufferedSinkWriter(spillable, maxDiskSize, spillCompression) - : new DirectSinkWriter(); + ? createBufferedSinkWriter(spillable) + : new DirectSinkWriter<>(this::createRollingRowWriter); if (increment != null) { newFiles.addAll(increment.newFilesIncrement().newFiles()); @@ -151,6 +151,18 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner { } } + private BufferedSinkWriter<InternalRow> createBufferedSinkWriter(boolean spillable) { + return new BufferedSinkWriter<>( + this::createRollingRowWriter, + t -> t, + t -> t, + ioManager, + writeSchema, + spillable, + maxDiskSize, + spillCompression); + } + @Override public void write(InternalRow rowData) throws Exception { Preconditions.checkArgument( @@ -178,7 +190,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner { write(row); } } else { - ((DirectSinkWriter) sinkWriter).writeBundle(bundle); + ((DirectSinkWriter<?>) sinkWriter).writeBundle(bundle); } } @@ -252,16 +264,16 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner { } public void toBufferedWriter() throws Exception { - if (sinkWriter != null && !sinkWriter.bufferSpillableWriter() && bucketFileRead != null) { + if (sinkWriter != null && !sinkWriter.bufferSpillableWriter() && dataFileRead != null) { // fetch the written results List<DataFileMeta> files = sinkWriter.flush(); sinkWriter.close(); - sinkWriter = new BufferedSinkWriter(true, maxDiskSize, spillCompression); + sinkWriter = createBufferedSinkWriter(true); sinkWriter.setMemoryPool(memorySegmentPool); // rewrite small files - try (RecordReaderIterator<InternalRow> reader = bucketFileRead.apply(files)) { + try (RecordReaderIterator<InternalRow> reader = dataFileRead.apply(files)) { while (reader.hasNext()) { sinkWriter.write(reader.next()); } @@ -356,7 +368,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner { @VisibleForTesting public RowBuffer getWriteBuffer() { if (sinkWriter instanceof BufferedSinkWriter) { - return ((BufferedSinkWriter) sinkWriter).writeBuffer; + return ((BufferedSinkWriter<?>) sinkWriter).rowBuffer(); } else { return null; } @@ -366,176 +378,4 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner { List<DataFileMeta> getNewFiles() { return newFiles; } - - /** Internal interface to Sink Data from input. */ - private interface SinkWriter { - - boolean write(InternalRow data) throws IOException; - - List<DataFileMeta> flush() throws IOException; - - boolean flushMemory() throws IOException; - - long memoryOccupancy(); - - void close(); - - void setMemoryPool(MemorySegmentPool memoryPool); - - boolean bufferSpillableWriter(); - } - - /** - * Directly sink data to file, no memory cache here, use OrcWriter/ParquetWrite/etc directly - * write data. May cause out-of-memory. - */ - private class DirectSinkWriter implements SinkWriter { - - private RowDataRollingFileWriter writer; - - @Override - public boolean write(InternalRow data) throws IOException { - if (writer == null) { - writer = createRollingRowWriter(); - } - writer.write(data); - return true; - } - - public void writeBundle(BundleRecords bundle) throws IOException { - if (writer == null) { - writer = createRollingRowWriter(); - } - writer.writeBundle(bundle); - } - - @Override - public List<DataFileMeta> flush() throws IOException { - List<DataFileMeta> flushedFiles = new ArrayList<>(); - if (writer != null) { - writer.close(); - flushedFiles.addAll(writer.result()); - writer = null; - } - return flushedFiles; - } - - @Override - public boolean flushMemory() throws IOException { - return false; - } - - @Override - public long memoryOccupancy() { - return 0; - } - - @Override - public void close() { - if (writer != null) { - writer.abort(); - writer = null; - } - } - - @Override - public void setMemoryPool(MemorySegmentPool memoryPool) { - // do nothing - } - - @Override - public boolean bufferSpillableWriter() { - return false; - } - } - - /** - * Use buffered writer, segment pooled from segment pool. When spillable, may delay checkpoint - * acknowledge time. When non-spillable, may cause too many small files. - */ - private class BufferedSinkWriter implements SinkWriter { - - private final boolean spillable; - - private final MemorySize maxDiskSize; - - private final CompressOptions compression; - - private RowBuffer writeBuffer; - - private BufferedSinkWriter( - boolean spillable, MemorySize maxDiskSize, CompressOptions compression) { - this.spillable = spillable; - this.maxDiskSize = maxDiskSize; - this.compression = compression; - } - - @Override - public boolean write(InternalRow data) throws IOException { - return writeBuffer.put(data); - } - - @Override - public List<DataFileMeta> flush() throws IOException { - List<DataFileMeta> flushedFiles = new ArrayList<>(); - if (writeBuffer != null) { - writeBuffer.complete(); - RowDataRollingFileWriter writer = createRollingRowWriter(); - IOException exception = null; - try (RowBuffer.RowBufferIterator iterator = writeBuffer.newIterator()) { - while (iterator.advanceNext()) { - writer.write(iterator.getRow()); - } - } catch (IOException e) { - exception = e; - } finally { - if (exception != null) { - IOUtils.closeQuietly(writer); - // cleanup code that might throw another exception - throw exception; - } - writer.close(); - } - flushedFiles.addAll(writer.result()); - // reuse writeBuffer - writeBuffer.reset(); - } - return flushedFiles; - } - - @Override - public long memoryOccupancy() { - return writeBuffer.memoryOccupancy(); - } - - @Override - public void close() { - if (writeBuffer != null) { - writeBuffer.reset(); - writeBuffer = null; - } - } - - @Override - public void setMemoryPool(MemorySegmentPool memoryPool) { - writeBuffer = - RowBuffer.getBuffer( - ioManager, - memoryPool, - new InternalRowSerializer(writeSchema), - spillable, - maxDiskSize, - compression); - } - - @Override - public boolean bufferSpillableWriter() { - return spillable; - } - - @Override - public boolean flushMemory() throws IOException { - return writeBuffer.flushMemory(); - } - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index fad898da45..a7bac3f12a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -567,7 +567,7 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> { } @VisibleForTesting - Map<BinaryRow, Map<Integer, WriterContainer<T>>> writers() { + public Map<BinaryRow, Map<Integer, WriterContainer<T>>> writers() { return writers; } diff --git a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java index 09b3afdff2..97b5dfe7c2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java @@ -26,11 +26,15 @@ import org.apache.paimon.format.FileFormat; import org.apache.paimon.format.avro.AvroSchemaConverter; import org.apache.paimon.fs.FileIO; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.io.KeyValueFileWriterFactory; -import org.apache.paimon.operation.AbstractFileStoreWrite; +import org.apache.paimon.mergetree.compact.ConcatRecordReader; import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.operation.FileStoreWrite; +import org.apache.paimon.operation.MemoryFileStoreWrite; import org.apache.paimon.options.Options; +import org.apache.paimon.reader.ReaderSupplier; +import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.BucketMode; import org.apache.paimon.types.RowType; @@ -39,10 +43,17 @@ import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.function.BiFunction; @@ -52,25 +63,37 @@ import static org.apache.paimon.format.FileFormat.fileFormat; import static org.apache.paimon.utils.FileStorePathFactory.createFormatPathFactories; /** {@link FileStoreWrite} for {@code bucket = -2} tables. */ -public class PostponeBucketFileStoreWrite extends AbstractFileStoreWrite<KeyValue> { +public class PostponeBucketFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { + + private static final Logger LOG = LoggerFactory.getLogger(PostponeBucketFileStoreWrite.class); private final CoreOptions options; private final KeyValueFileWriterFactory.Builder writerFactoryBuilder; + private final FileIO fileIO; + private final FileStorePathFactory pathFactory; + private final KeyValueFileReaderFactory.Builder readerFactoryBuilder; + + private boolean forceBufferSpill = false; public PostponeBucketFileStoreWrite( FileIO fileIO, + FileStorePathFactory pathFactory, TableSchema schema, String commitUser, RowType partitionType, RowType keyType, RowType valueType, BiFunction<CoreOptions, String, FileStorePathFactory> formatPathFactory, + KeyValueFileReaderFactory.Builder readerFactoryBuilder, SnapshotManager snapshotManager, FileStoreScan scan, CoreOptions options, String tableName, @Nullable Integer writeId) { - super(snapshotManager, scan, null, null, tableName, options, partitionType); + super(snapshotManager, scan, options, partitionType, null, null, tableName); + this.fileIO = fileIO; + this.pathFactory = pathFactory; + this.readerFactoryBuilder = readerFactoryBuilder; Options newOptions = new Options(options.toMap()); try { @@ -120,6 +143,25 @@ public class PostponeBucketFileStoreWrite extends AbstractFileStoreWrite<KeyValu withIgnorePreviousFiles(true); } + @Override + protected void forceBufferSpill() throws Exception { + if (ioManager == null) { + return; + } + if (forceBufferSpill) { + return; + } + forceBufferSpill = true; + LOG.info( + "Force buffer spill for postpone file store write, writer number is: {}", + writers.size()); + for (Map<Integer, WriterContainer<KeyValue>> bucketWriters : writers.values()) { + for (WriterContainer<KeyValue> writerContainer : bucketWriters.values()) { + ((PostponeBucketWriter) writerContainer.writer).toBufferedWriter(); + } + } + } + @Override public void withIgnorePreviousFiles(boolean ignorePrevious) { // see comments in constructor @@ -141,7 +183,28 @@ public class PostponeBucketFileStoreWrite extends AbstractFileStoreWrite<KeyValu "Postpone bucket writers should not restore previous files. This is unexpected."); KeyValueFileWriterFactory writerFactory = writerFactoryBuilder.build(partition, bucket, options); - return new PostponeBucketWriter(writerFactory, restoreIncrement); + return new PostponeBucketWriter( + fileIO, + pathFactory.createDataFilePathFactory(partition, bucket), + options.spillCompressOptions(), + options.writeBufferSpillDiskSize(), + ioManager, + writerFactory, + files -> newFileRead(partition, bucket, files), + forceBufferSpill, + forceBufferSpill, + restoreIncrement); + } + + private RecordReaderIterator<KeyValue> newFileRead( + BinaryRow partition, int bucket, List<DataFileMeta> files) throws IOException { + KeyValueFileReaderFactory readerFactory = + readerFactoryBuilder.build(partition, bucket, name -> Optional.empty()); + List<ReaderSupplier<KeyValue>> suppliers = new ArrayList<>(); + for (DataFileMeta file : files) { + suppliers.add(() -> readerFactory.createRecordReader(file)); + } + return new RecordReaderIterator<>(ConcatRecordReader.create(suppliers)); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java index 10259650d4..c41db8aac7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java @@ -19,14 +19,29 @@ package org.apache.paimon.postpone; import org.apache.paimon.KeyValue; +import org.apache.paimon.KeyValueSerializer; +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.compression.CompressOptions; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.fs.FileIO; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.io.DataIncrement; import org.apache.paimon.io.KeyValueFileWriterFactory; import org.apache.paimon.io.RollingFileWriter; import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.memory.MemoryOwner; +import org.apache.paimon.memory.MemorySegmentPool; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.reader.RecordReaderIterator; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CommitIncrement; +import org.apache.paimon.utils.IOFunction; import org.apache.paimon.utils.RecordWriter; +import org.apache.paimon.utils.SinkWriter; +import org.apache.paimon.utils.SinkWriter.BufferedSinkWriter; +import org.apache.paimon.utils.SinkWriter.DirectSinkWriter; import javax.annotation.Nullable; @@ -36,30 +51,69 @@ import java.util.Collections; import java.util.List; /** {@link RecordWriter} for {@code bucket = -2} tables. */ -public class PostponeBucketWriter implements RecordWriter<KeyValue> { +public class PostponeBucketWriter implements RecordWriter<KeyValue>, MemoryOwner { + private final FileIO fileIO; + private final DataFilePathFactory pathFactory; private final KeyValueFileWriterFactory writerFactory; private final List<DataFileMeta> files; + private final IOFunction<List<DataFileMeta>, RecordReaderIterator<KeyValue>> fileRead; + private final @Nullable IOManager ioManager; + private final CompressOptions spillCompression; + private final MemorySize maxDiskSize; - private RollingFileWriter<KeyValue, DataFileMeta> writer; + private SinkWriter<KeyValue> sinkWriter; + private MemorySegmentPool memorySegmentPool; public PostponeBucketWriter( - KeyValueFileWriterFactory writerFactory, @Nullable CommitIncrement restoreIncrement) { + FileIO fileIO, + DataFilePathFactory pathFactory, + CompressOptions spillCompression, + MemorySize maxDiskSize, + @Nullable IOManager ioManager, + KeyValueFileWriterFactory writerFactory, + IOFunction<List<DataFileMeta>, RecordReaderIterator<KeyValue>> fileRead, + boolean useWriteBuffer, + boolean spillable, + @Nullable CommitIncrement restoreIncrement) { + this.ioManager = ioManager; this.writerFactory = writerFactory; + this.fileRead = fileRead; + this.fileIO = fileIO; + this.pathFactory = pathFactory; + this.spillCompression = spillCompression; + this.maxDiskSize = maxDiskSize; this.files = new ArrayList<>(); if (restoreIncrement != null) { files.addAll(restoreIncrement.newFilesIncrement().newFiles()); } + this.sinkWriter = + useWriteBuffer + ? createBufferedSinkWriter(spillable) + : new DirectSinkWriter<>(this::createRollingRowWriter); + } - this.writer = null; + private RollingFileWriter<KeyValue, DataFileMeta> createRollingRowWriter() { + return writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND); } @Override public void write(KeyValue record) throws Exception { - if (writer == null) { - writer = writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND); + boolean success = sinkWriter.write(record); + if (!success) { + flush(); + success = sinkWriter.write(record); + if (!success) { + // Should not get here, because writeBuffer will throw too big exception out. + // But we throw again in case of something unexpected happens. (like someone changed + // code in SpillableBuffer.) + throw new RuntimeException("Mem table is too small to hold a single element."); + } } - writer.write(record); + } + + private void flush() throws Exception { + files.addAll(sinkWriter.flush()); } @Override @@ -82,13 +136,66 @@ public class PostponeBucketWriter implements RecordWriter<KeyValue> { } @Override - public CommitIncrement prepareCommit(boolean waitCompaction) throws Exception { - if (writer != null) { - writer.close(); - files.addAll(writer.result()); - writer = null; + public void setMemoryPool(MemorySegmentPool memoryPool) { + this.memorySegmentPool = memoryPool; + sinkWriter.setMemoryPool(memoryPool); + } + + @Override + public long memoryOccupancy() { + return sinkWriter.memoryOccupancy(); + } + + @Override + public void flushMemory() throws Exception { + boolean success = sinkWriter.flushMemory(); + if (!success) { + flush(); } + } + private BufferedSinkWriter<KeyValue> createBufferedSinkWriter(boolean spillable) { + RowType keyType = writerFactory.keyType(); + RowType valueType = writerFactory.valueType(); + RowType kvRowType = KeyValue.schema(keyType, valueType); + KeyValueSerializer serializer = new KeyValueSerializer(keyType, valueType); + return new BufferedSinkWriter<>( + this::createRollingRowWriter, + serializer::toRow, + serializer::fromRow, + ioManager, + kvRowType, + spillable, + maxDiskSize, + spillCompression); + } + + public void toBufferedWriter() throws Exception { + if (sinkWriter != null && !sinkWriter.bufferSpillableWriter() && fileRead != null) { + // fetch the written results + List<DataFileMeta> files = sinkWriter.flush(); + + sinkWriter.close(); + sinkWriter = createBufferedSinkWriter(true); + sinkWriter.setMemoryPool(memorySegmentPool); + + // rewrite small files + try (RecordReaderIterator<KeyValue> reader = fileRead.apply(files)) { + while (reader.hasNext()) { + sinkWriter.write(reader.next()); + } + } finally { + // remove small files + for (DataFileMeta file : files) { + fileIO.deleteQuietly(pathFactory.toPath(file)); + } + } + } + } + + @Override + public CommitIncrement prepareCommit(boolean waitCompaction) throws Exception { + flush(); List<DataFileMeta> result = new ArrayList<>(files); files.clear(); return new CommitIncrement( @@ -97,6 +204,11 @@ public class PostponeBucketWriter implements RecordWriter<KeyValue> { null); } + @VisibleForTesting + public boolean useBufferedSinkWriter() { + return sinkWriter instanceof BufferedSinkWriter; + } + @Override public boolean compactNotCompleted() { return false; @@ -107,9 +219,6 @@ public class PostponeBucketWriter implements RecordWriter<KeyValue> { @Override public void close() throws Exception { - if (writer != null) { - writer.abort(); - writer = null; - } + sinkWriter.close(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SinkWriter.java b/paimon-core/src/main/java/org/apache/paimon/utils/SinkWriter.java new file mode 100644 index 0000000000..c4596a1df4 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SinkWriter.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.compression.CompressOptions; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.disk.RowBuffer; +import org.apache.paimon.io.BundleRecords; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.RollingFileWriter; +import org.apache.paimon.memory.MemorySegmentPool; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.types.RowType; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; +import java.util.function.Supplier; + +/** Internal interface to Sink Data from input. */ +public interface SinkWriter<T> { + + boolean write(T data) throws IOException; + + List<DataFileMeta> flush() throws IOException; + + boolean flushMemory() throws IOException; + + long memoryOccupancy(); + + void close(); + + void setMemoryPool(MemorySegmentPool memoryPool); + + boolean bufferSpillableWriter(); + + /** + * Directly sink data to file, no memory cache here, use OrcWriter/ParquetWrite/etc directly + * write data. May cause out-of-memory. + */ + class DirectSinkWriter<T> implements SinkWriter<T> { + + private final Supplier<RollingFileWriter<T, DataFileMeta>> writerSupplier; + + private RollingFileWriter<T, DataFileMeta> writer; + + public DirectSinkWriter(Supplier<RollingFileWriter<T, DataFileMeta>> writerSupplier) { + this.writerSupplier = writerSupplier; + } + + @Override + public boolean write(T data) throws IOException { + if (writer == null) { + writer = writerSupplier.get(); + } + writer.write(data); + return true; + } + + public void writeBundle(BundleRecords bundle) throws IOException { + if (writer == null) { + writer = writerSupplier.get(); + } + writer.writeBundle(bundle); + } + + @Override + public List<DataFileMeta> flush() throws IOException { + List<DataFileMeta> flushedFiles = new ArrayList<>(); + if (writer != null) { + writer.close(); + flushedFiles.addAll(writer.result()); + writer = null; + } + return flushedFiles; + } + + @Override + public boolean flushMemory() throws IOException { + return false; + } + + @Override + public long memoryOccupancy() { + return 0; + } + + @Override + public void close() { + if (writer != null) { + writer.abort(); + writer = null; + } + } + + @Override + public void setMemoryPool(MemorySegmentPool memoryPool) { + // do nothing + } + + @Override + public boolean bufferSpillableWriter() { + return false; + } + } + + /** + * Use buffered writer, segment pooled from segment pool. When spillable, may delay checkpoint + * acknowledge time. When non-spillable, may cause too many small files. + */ + class BufferedSinkWriter<T> implements SinkWriter<T> { + + private final Supplier<RollingFileWriter<T, DataFileMeta>> writerSupplier; + private final Function<T, InternalRow> toRow; + private final Function<InternalRow, T> fromRow; + private final IOManager ioManager; + private final RowType rowType; + private final boolean spillable; + private final MemorySize maxDiskSize; + private final CompressOptions compression; + + private RowBuffer writeBuffer; + + public BufferedSinkWriter( + Supplier<RollingFileWriter<T, DataFileMeta>> writerSupplier, + Function<T, InternalRow> toRow, + Function<InternalRow, T> fromRow, + IOManager ioManager, + RowType rowType, + boolean spillable, + MemorySize maxDiskSize, + CompressOptions compression) { + this.writerSupplier = writerSupplier; + this.toRow = toRow; + this.fromRow = fromRow; + this.ioManager = ioManager; + this.rowType = rowType; + this.spillable = spillable; + this.maxDiskSize = maxDiskSize; + this.compression = compression; + } + + public RowBuffer rowBuffer() { + return writeBuffer; + } + + @Override + public boolean write(T data) throws IOException { + return writeBuffer.put(toRow.apply(data)); + } + + @Override + public List<DataFileMeta> flush() throws IOException { + List<DataFileMeta> flushedFiles = new ArrayList<>(); + if (writeBuffer != null) { + writeBuffer.complete(); + RollingFileWriter<T, DataFileMeta> writer = writerSupplier.get(); + IOException exception = null; + try (RowBuffer.RowBufferIterator iterator = writeBuffer.newIterator()) { + while (iterator.advanceNext()) { + writer.write(fromRow.apply(iterator.getRow())); + } + } catch (IOException e) { + exception = e; + } finally { + if (exception != null) { + IOUtils.closeQuietly(writer); + // cleanup code that might throw another exception + throw exception; + } + writer.close(); + } + flushedFiles.addAll(writer.result()); + // reuse writeBuffer + writeBuffer.reset(); + } + return flushedFiles; + } + + @Override + public long memoryOccupancy() { + return writeBuffer.memoryOccupancy(); + } + + @Override + public void close() { + if (writeBuffer != null) { + writeBuffer.reset(); + writeBuffer = null; + } + } + + @Override + public void setMemoryPool(MemorySegmentPool memoryPool) { + writeBuffer = + RowBuffer.getBuffer( + ioManager, + memoryPool, + new InternalRowSerializer(rowType), + spillable, + maxDiskSize, + compression); + } + + @Override + public boolean bufferSpillableWriter() { + return spillable; + } + + @Override + public boolean flushMemory() throws IOException { + return writeBuffer.flushMemory(); + } + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java index 0bf4971f94..35928e2335 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java @@ -21,6 +21,7 @@ package org.apache.paimon.table; import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.ChangelogProducer; import org.apache.paimon.CoreOptions.LookupLocalFileType; +import org.apache.paimon.KeyValue; import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; @@ -34,9 +35,12 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.operation.AbstractFileStoreWrite; import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; +import org.apache.paimon.postpone.PostponeBucketFileStoreWrite; +import org.apache.paimon.postpone.PostponeBucketWriter; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.reader.RecordReader; @@ -54,6 +58,7 @@ import org.apache.paimon.table.sink.InnerTableCommit; import org.apache.paimon.table.sink.StreamTableCommit; import org.apache.paimon.table.sink.StreamTableWrite; import org.apache.paimon.table.sink.StreamWriteBuilder; +import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.table.sink.WriteSelector; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.InnerTableRead; @@ -136,6 +141,39 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link PrimaryKeyFileStoreTable}. */ public class PrimaryKeySimpleTableTest extends SimpleTableTestBase { + @Test + public void testPostponeBucketWithManyPartitions() throws Exception { + FileStoreTable table = + createFileStoreTable(options -> options.set(BUCKET, BucketMode.POSTPONE_BUCKET)); + + BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); + try (BatchTableWrite write = writeBuilder.newWrite(); + BatchTableCommit commit = writeBuilder.newCommit()) { + write.withIOManager(new IOManagerImpl(tempDir.toString())); + for (int i = 0; i < 100; i++) { + write.write(rowData(i, i, (long) i)); + } + + for (Map<Integer, AbstractFileStoreWrite.WriterContainer<KeyValue>> bucketWriters : + ((PostponeBucketFileStoreWrite) ((TableWriteImpl<?>) write).getWrite()) + .writers() + .values()) { + for (AbstractFileStoreWrite.WriterContainer<KeyValue> writerContainer : + bucketWriters.values()) { + PostponeBucketWriter writer = (PostponeBucketWriter) writerContainer.writer; + assertThat(writer.useBufferedSinkWriter()).isTrue(); + } + } + commit.commit(write.prepareCommit()); + } + + Snapshot snapshot = table.latestSnapshot().get(); + ManifestFileMeta manifest = + table.manifestListReader().read(snapshot.deltaManifestList()).get(0); + List<ManifestEntry> entries = table.manifestFileReader().read(manifest.fileName()); + assertThat(entries.size()).isEqualTo(100); + } + @Test public void testPostponeBucket() throws Exception { FileStoreTable table =
