This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 737da60bb3 [arrow] support ArrowFormatWriter to use an external 
BufferAllocator to provide finer-grained memory control and monitor allocation 
events. (#5279)
737da60bb3 is described below

commit 737da60bb320930fa0d8f264643b6bc7561a9719
Author: liming.1018 <[email protected]>
AuthorDate: Thu Mar 13 17:31:11 2025 +0800

    [arrow] support ArrowFormatWriter to use an external BufferAllocator to 
provide finer-grained memory control and monitor allocation events. (#5279)
---
 .../paimon/arrow/vector/ArrowFormatCWriter.java    | 17 ++++--
 .../paimon/arrow/vector/ArrowFormatWriter.java     | 12 +++-
 .../paimon/arrow/vector/ArrowFormatWriterTest.java | 68 +++++++++++++++-------
 3 files changed, 69 insertions(+), 28 deletions(-)

diff --git 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
index eb7634fcbf..9dfbbf5782 100644
--- 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
+++ 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
@@ -24,7 +24,7 @@ import org.apache.paimon.types.RowType;
 
 import org.apache.arrow.c.ArrowArray;
 import org.apache.arrow.c.ArrowSchema;
-import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.VectorSchemaRoot;
 
 /**
@@ -38,8 +38,17 @@ public class ArrowFormatCWriter implements AutoCloseable {
     private final ArrowFormatWriter realWriter;
 
     public ArrowFormatCWriter(RowType rowType, int writeBatchSize, boolean 
caseSensitive) {
-        this.realWriter = new ArrowFormatWriter(rowType, writeBatchSize, 
caseSensitive);
-        RootAllocator allocator = realWriter.getAllocator();
+        this(new ArrowFormatWriter(rowType, writeBatchSize, caseSensitive));
+    }
+
+    public ArrowFormatCWriter(
+            RowType rowType, int writeBatchSize, boolean caseSensitive, 
BufferAllocator allocator) {
+        this(new ArrowFormatWriter(rowType, writeBatchSize, caseSensitive, 
allocator));
+    }
+
+    private ArrowFormatCWriter(ArrowFormatWriter arrowFormatWriter) {
+        this.realWriter = arrowFormatWriter;
+        BufferAllocator allocator = realWriter.getAllocator();
         array = ArrowArray.allocateNew(allocator);
         schema = ArrowSchema.allocateNew(allocator);
     }
@@ -79,7 +88,7 @@ public class ArrowFormatCWriter implements AutoCloseable {
         return realWriter.getVectorSchemaRoot();
     }
 
-    public RootAllocator getAllocator() {
+    public BufferAllocator getAllocator() {
         return realWriter.getAllocator();
     }
 }
diff --git 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
index 6bdc234204..a377dd9f04 100644
--- 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
+++ 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
@@ -24,6 +24,7 @@ import 
org.apache.paimon.arrow.writer.ArrowFieldWriterFactoryVisitor;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.types.RowType;
 
+import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.util.OversizedAllocationException;
@@ -40,11 +41,16 @@ public class ArrowFormatWriter implements AutoCloseable {
 
     private final int batchSize;
 
-    private final RootAllocator allocator;
+    private final BufferAllocator allocator;
     private int rowId;
 
     public ArrowFormatWriter(RowType rowType, int writeBatchSize, boolean 
caseSensitive) {
-        allocator = new RootAllocator();
+        this(rowType, writeBatchSize, caseSensitive, new RootAllocator());
+    }
+
+    public ArrowFormatWriter(
+            RowType rowType, int writeBatchSize, boolean caseSensitive, 
BufferAllocator allocator) {
+        this.allocator = allocator;
 
         vectorSchemaRoot = ArrowUtils.createVectorSchemaRoot(rowType, 
allocator, caseSensitive);
 
@@ -105,7 +111,7 @@ public class ArrowFormatWriter implements AutoCloseable {
         return vectorSchemaRoot;
     }
 
-    public RootAllocator getAllocator() {
+    public BufferAllocator getAllocator() {
         return allocator;
     }
 }
diff --git 
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
 
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
index f4eee75185..63e53ca9b8 100644
--- 
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
+++ 
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
@@ -30,10 +30,15 @@ import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.StringUtils;
 
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.OutOfMemoryException;
+import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.math.BigDecimal;
 import java.util.ArrayList;
@@ -193,36 +198,57 @@ public class ArrowFormatWriterTest {
     @Test
     public void testCWriter() {
         try (ArrowFormatCWriter writer = new 
ArrowFormatCWriter(PRIMITIVE_TYPE, 4096, true)) {
-            List<InternalRow> list = new ArrayList<>();
-            List<InternalRow.FieldGetter> fieldGetters = new ArrayList<>();
+            writeAndCheck(writer);
+        }
+    }
 
-            for (int i = 0; i < PRIMITIVE_TYPE.getFieldCount(); i++) {
-                
fieldGetters.add(InternalRow.createFieldGetter(PRIMITIVE_TYPE.getTypeAt(i), i));
-            }
-            for (int i = 0; i < 1000; i++) {
-                list.add(GenericRow.of(randomRowValues(null)));
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testWriteWithExternalAllocator(boolean allocationFailed) {
+        long maxAllocation = allocationFailed ? 1024L : Long.MAX_VALUE;
+        try (RootAllocator rootAllocator = new RootAllocator();
+                BufferAllocator allocator =
+                        rootAllocator.newChildAllocator("paimonWriter", 0, 
maxAllocation);
+                ArrowFormatCWriter writer =
+                        new ArrowFormatCWriter(PRIMITIVE_TYPE, 4096, true, 
allocator)) {
+            writeAndCheck(writer);
+        } catch (OutOfMemoryException e) {
+            if (!allocationFailed) {
+                throw e;
             }
+        }
+    }
 
-            list.forEach(writer::write);
+    private void writeAndCheck(ArrowFormatCWriter writer) {
+        List<InternalRow> list = new ArrayList<>();
+        List<InternalRow.FieldGetter> fieldGetters = new ArrayList<>();
 
-            writer.flush();
-            VectorSchemaRoot vectorSchemaRoot = writer.getVectorSchemaRoot();
+        for (int i = 0; i < PRIMITIVE_TYPE.getFieldCount(); i++) {
+            
fieldGetters.add(InternalRow.createFieldGetter(PRIMITIVE_TYPE.getTypeAt(i), i));
+        }
+        for (int i = 0; i < 1000; i++) {
+            list.add(GenericRow.of(randomRowValues(null)));
+        }
 
-            ArrowBatchReader arrowBatchReader = new 
ArrowBatchReader(PRIMITIVE_TYPE, true);
-            Iterable<InternalRow> rows = 
arrowBatchReader.readBatch(vectorSchemaRoot);
+        list.forEach(writer::write);
 
-            Iterator<InternalRow> iterator = rows.iterator();
-            for (int i = 0; i < 1000; i++) {
-                InternalRow actual = iterator.next();
-                InternalRow expectec = list.get(i);
+        writer.flush();
+        VectorSchemaRoot vectorSchemaRoot = writer.getVectorSchemaRoot();
 
-                for (InternalRow.FieldGetter fieldGetter : fieldGetters) {
-                    Assertions.assertThat(fieldGetter.getFieldOrNull(actual))
-                            .isEqualTo(fieldGetter.getFieldOrNull(expectec));
-                }
+        ArrowBatchReader arrowBatchReader = new 
ArrowBatchReader(PRIMITIVE_TYPE, true);
+        Iterable<InternalRow> rows = 
arrowBatchReader.readBatch(vectorSchemaRoot);
+
+        Iterator<InternalRow> iterator = rows.iterator();
+        for (int i = 0; i < 1000; i++) {
+            InternalRow actual = iterator.next();
+            InternalRow expectec = list.get(i);
+
+            for (InternalRow.FieldGetter fieldGetter : fieldGetters) {
+                Assertions.assertThat(fieldGetter.getFieldOrNull(actual))
+                        .isEqualTo(fieldGetter.getFieldOrNull(expectec));
             }
-            writer.release();
         }
+        writer.release();
     }
 
     private Object[] randomRowValues(boolean[] nullable) {

Reply via email to