This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3d738a9b8a [format] Introduce 'write.batch-memory' to control memory
in arrow (#5988)
3d738a9b8a is described below
commit 3d738a9b8a7ed7260e4d59c400a053d8bdfc845e
Author: YeJunHao <[email protected]>
AuthorDate: Thu Jul 31 12:10:22 2025 +0800
[format] Introduce 'write.batch-memory' to control memory in arrow (#5988)
---
.../shortcodes/generated/core_configuration.html | 6 +++
.../main/java/org/apache/paimon/CoreOptions.java | 6 +++
.../paimon/arrow/vector/ArrowFormatCWriter.java | 14 +++++-
.../paimon/arrow/vector/ArrowFormatWriter.java | 43 +++++++++++++++--
.../paimon/arrow/vector/ArrowFormatWriterTest.java | 54 ++++++++++++++++++++++
.../java/org/apache/paimon/format/FileFormat.java | 1 +
.../apache/paimon/format/FileFormatFactory.java | 18 +++++++-
.../format/parquet/ParquetFileFormatTest.java | 4 +-
.../paimon/format/lance/LanceFileFormat.java | 7 ++-
9 files changed, 144 insertions(+), 9 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 85fa54aa63..c287901335 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1248,6 +1248,12 @@ If the data size allocated for the sorting task is
uneven,which may lead to perf
<td>Boolean</td>
<td>If set to true, compactions and snapshot expiration will be
skipped. This option is used along with dedicated compact jobs.</td>
</tr>
+ <tr>
+ <td><h5>write.batch-memory</h5></td>
+ <td style="word-wrap: break-word;">128 mb</td>
+ <td>MemorySize</td>
+ <td>Write batch memory for any file format if it supports.</td>
+ </tr>
<tr>
<td><h5>write.batch-size</h5></td>
<td style="word-wrap: break-word;">1024</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index ca2c67d0f4..54802731b4 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1179,6 +1179,12 @@ public class CoreOptions implements Serializable {
.withFallbackKeys("orc.write.batch-size")
.withDescription("Write batch size for any file format if
it supports.");
+ public static final ConfigOption<MemorySize> WRITE_BATCH_MEMORY =
+ key("write.batch-memory")
+ .memoryType()
+ .defaultValue(MemorySize.parse("128 mb"))
+ .withDescription("Write batch memory for any file format
if it supports.");
+
public static final ConfigOption<String> CONSUMER_ID =
key("consumer-id")
.stringType()
diff --git
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
index 2f1e2f2a53..afa58250c3 100644
---
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
+++
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
@@ -27,6 +27,8 @@ import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
+import javax.annotation.Nullable;
+
/**
* This writer could flush to c struct, but you need to release it, except it
has been released in c
* code.
@@ -38,12 +40,20 @@ public class ArrowFormatCWriter implements AutoCloseable {
private final ArrowFormatWriter realWriter;
public ArrowFormatCWriter(RowType rowType, int writeBatchSize, boolean
caseSensitive) {
- this(new ArrowFormatWriter(rowType, writeBatchSize, caseSensitive));
+ this(new ArrowFormatWriter(rowType, writeBatchSize, caseSensitive,
null));
+ }
+
+ public ArrowFormatCWriter(
+ RowType rowType,
+ int writeBatchSize,
+ boolean caseSensitive,
+ @Nullable Long memoryUsedMaxInVSR) {
+ this(new ArrowFormatWriter(rowType, writeBatchSize, caseSensitive,
memoryUsedMaxInVSR));
}
public ArrowFormatCWriter(
RowType rowType, int writeBatchSize, boolean caseSensitive,
BufferAllocator allocator) {
- this(new ArrowFormatWriter(rowType, writeBatchSize, caseSensitive,
allocator));
+ this(new ArrowFormatWriter(rowType, writeBatchSize, caseSensitive,
allocator, null));
}
private ArrowFormatCWriter(ArrowFormatWriter arrowFormatWriter) {
diff --git
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
index a957fa2288..44c764804b 100644
---
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
+++
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
@@ -27,11 +27,14 @@ import org.apache.paimon.types.RowType;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.util.OversizedAllocationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
/** Write from {@link InternalRow} to {@link VectorSchemaRoot}. */
public class ArrowFormatWriter implements AutoCloseable {
@@ -39,18 +42,29 @@ public class ArrowFormatWriter implements AutoCloseable {
private final VectorSchemaRoot vectorSchemaRoot;
private final ArrowFieldWriter[] fieldWriters;
-
private final int batchSize;
-
private final BufferAllocator allocator;
+ @Nullable private final Long memoryUsedMaxInBytes;
private int rowId;
public ArrowFormatWriter(RowType rowType, int writeBatchSize, boolean
caseSensitive) {
- this(rowType, writeBatchSize, caseSensitive, new RootAllocator());
+ this(rowType, writeBatchSize, caseSensitive, new RootAllocator(),
null);
}
public ArrowFormatWriter(
- RowType rowType, int writeBatchSize, boolean caseSensitive,
BufferAllocator allocator) {
+ RowType rowType,
+ int writeBatchSize,
+ boolean caseSensitive,
+ @Nullable Long memoryUsedMaxInBytes) {
+ this(rowType, writeBatchSize, caseSensitive, new RootAllocator(),
memoryUsedMaxInBytes);
+ }
+
+ public ArrowFormatWriter(
+ RowType rowType,
+ int writeBatchSize,
+ boolean caseSensitive,
+ BufferAllocator allocator,
+ @Nullable Long memoryUsedMaxInBytes) {
this.allocator = allocator;
vectorSchemaRoot = ArrowUtils.createVectorSchemaRoot(rowType,
allocator, caseSensitive);
@@ -65,6 +79,7 @@ public class ArrowFormatWriter implements AutoCloseable {
}
this.batchSize = writeBatchSize;
+ this.memoryUsedMaxInBytes = memoryUsedMaxInBytes;
}
public void flush() {
@@ -75,6 +90,17 @@ public class ArrowFormatWriter implements AutoCloseable {
if (rowId >= batchSize) {
return false;
}
+ if (memoryUsedMaxInBytes != null && rowId % 32 == 0) {
+ long memoryUsed = memoryUsed();
+ if (memoryUsed > memoryUsedMaxInBytes) {
+ LOG.debug(
+ "Memory used by ArrowFormatCWriter exceeds the limit:
{} > {} while writing record row id: {}",
+ memoryUsed,
+ memoryUsedMaxInBytes,
+ rowId);
+ return false;
+ }
+ }
for (int i = 0; i < currentRow.getFieldCount(); i++) {
try {
fieldWriters[i].write(rowId, currentRow, i);
@@ -89,6 +115,15 @@ public class ArrowFormatWriter implements AutoCloseable {
return true;
}
+ public long memoryUsed() {
+ vectorSchemaRoot.setRowCount(rowId);
+ long memoryUsed = 0;
+ for (FieldVector fieldVector : vectorSchemaRoot.getFieldVectors()) {
+ memoryUsed += fieldVector.getBufferSize();
+ }
+ return memoryUsed;
+ }
+
public boolean empty() {
return rowId == 0;
}
diff --git
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
index d7e857c111..d7ee8ca1a5 100644
---
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
+++
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
@@ -42,6 +42,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.math.BigDecimal;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
@@ -165,6 +166,59 @@ public class ArrowFormatWriterTest {
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testWriteWithMemoryLimit(boolean limitMemory) {
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "f0", DataTypes.BYTES()),
+ new DataField(1, "f1", DataTypes.BYTES())));
+ Long memoryLimit = limitMemory ? 100 * 1024 * 1024L : null;
+ try (ArrowFormatWriter writer = new ArrowFormatWriter(rowType, 4096,
true, memoryLimit)) {
+
+ GenericRow genericRow = new GenericRow(2);
+ genericRow.setField(0, randomBytes(1024 * 1024, 1024 * 1024));
+ genericRow.setField(1, randomBytes(1024 * 1024, 1024 * 1024));
+
+ // normal write
+ for (int i = 0; i < 200; i++) {
+ boolean success = writer.write(genericRow);
+ if (!success) {
+ writer.flush();
+ writer.reset();
+ writer.write(genericRow);
+ }
+ }
+ writer.reset();
+
+ if (limitMemory) {
+ for (int i = 0; i < 64; i++) {
+ Assertions.assertThat(writer.write(genericRow)).isTrue();
+ }
+ Assertions.assertThat(writer.write(genericRow)).isFalse();
+ }
+ writer.reset();
+
+ // Write batch records
+ for (int i = 0; i < 2000; i++) {
+ boolean success = writer.write(genericRow);
+ if (!success) {
+ writer.flush();
+ writer.reset();
+ writer.write(genericRow);
+ }
+ }
+
+ if (limitMemory) {
+
Assertions.assertThat(writer.memoryUsed()).isLessThan(memoryLimit);
+
Assertions.assertThat(writer.getAllocator().getAllocatedMemory())
+ .isGreaterThan(memoryLimit)
+ .isLessThan(2 * memoryLimit);
+ }
+ }
+ }
+
@Test
public void testArrowBundleRecords() {
try (ArrowFormatWriter writer = new ArrowFormatWriter(PRIMITIVE_TYPE,
4096, true)) {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
index bef47edbf3..7ee33e6843 100644
--- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
+++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
@@ -84,6 +84,7 @@ public abstract class FileFormat {
options,
options.get(CoreOptions.READ_BATCH_SIZE),
options.get(CoreOptions.WRITE_BATCH_SIZE),
+ options.get(CoreOptions.WRITE_BATCH_MEMORY),
options.get(CoreOptions.FILE_COMPRESSION_ZSTD_LEVEL),
options.get(CoreOptions.FILE_BLOCK_SIZE)));
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java
b/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java
index b726a84f24..79354b9c26 100644
---
a/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java
+++
b/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java
@@ -37,23 +37,35 @@ public interface FileFormatFactory {
private final Options options;
private final int readBatchSize;
private final int writeBatchSize;
+ private final MemorySize writeBatchMemory;
private final int zstdLevel;
@Nullable private final MemorySize blockSize;
@VisibleForTesting
public FormatContext(Options options, int readBatchSize, int
writeBatchSize) {
- this(options, readBatchSize, writeBatchSize, 1, null);
+ this(options, readBatchSize, writeBatchSize,
MemorySize.VALUE_128_MB, 1, null);
}
+ @VisibleForTesting
public FormatContext(
Options options,
int readBatchSize,
int writeBatchSize,
+ MemorySize writeBatchMemory) {
+ this(options, readBatchSize, writeBatchSize, writeBatchMemory, 1,
null);
+ }
+
+ public FormatContext(
+ Options options,
+ int readBatchSize,
+ int writeBatchSize,
+ MemorySize writeBatchMemory,
int zstdLevel,
@Nullable MemorySize blockSize) {
this.options = options;
this.readBatchSize = readBatchSize;
this.writeBatchSize = writeBatchSize;
+ this.writeBatchMemory = writeBatchMemory;
this.zstdLevel = zstdLevel;
this.blockSize = blockSize;
}
@@ -70,6 +82,10 @@ public interface FileFormatFactory {
return writeBatchSize;
}
+ public MemorySize writeBatchMemory() {
+ return writeBatchMemory;
+ }
+
public int zstdLevel() {
return zstdLevel;
}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java
index b51f8fc05d..72159c1ad6 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java
@@ -22,6 +22,7 @@ import
org.apache.paimon.format.FileFormatFactory.FormatContext;
import org.apache.paimon.format.parquet.writer.RowDataParquetBuilder;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
@@ -48,7 +49,8 @@ public class ParquetFileFormatTest {
Options options = new Options();
options.set(parquetKey, "hello");
options.set(otherKey, "test");
- FormatContext context = new FormatContext(options, 1024, 1024, 2,
null);
+ FormatContext context =
+ new FormatContext(options, 1024, 1024,
MemorySize.VALUE_128_MB, 2, null);
Options actual = new ParquetFileFormat(context).getOptions();
assertThat(actual.get(parquetKey)).isEqualTo("hello");
diff --git
a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceFileFormat.java
b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceFileFormat.java
index 72f99b2a32..c476652640 100644
---
a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceFileFormat.java
+++
b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceFileFormat.java
@@ -75,7 +75,12 @@ public class LanceFileFormat extends FileFormat {
@Override
public FormatWriterFactory createWriterFactory(RowType type) {
return new LanceWriterFactory(
- () -> new ArrowFormatWriter(type,
formatContext.writeBatchSize(), true));
+ () ->
+ new ArrowFormatWriter(
+ type,
+ formatContext.writeBatchSize(),
+ true,
+ formatContext.writeBatchMemory().getBytes()));
}
@Override