This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 40132ee3eb [arrow] Support arrow variant/shredding writer. (#7082)
40132ee3eb is described below

commit 40132ee3eb9e7e5bf7f6a5584967919c4d1bf73b
Author: YeJunHao <[email protected]>
AuthorDate: Tue Jan 20 15:04:20 2026 +0800

    [arrow] Support arrow variant/shredding writer. (#7082)
---
 .../paimon/arrow/ArrowFieldTypeConversion.java     |   2 +-
 .../java/org/apache/paimon/arrow/ArrowUtils.java   |  13 +++
 .../paimon/arrow/vector/ArrowFormatCWriter.java    |  11 +-
 .../paimon/arrow/vector/ArrowFormatWriter.java     |  84 +++++++++++++--
 .../writer/ArrowFieldWriterFactoryVisitor.java     |   3 +-
 .../paimon/arrow/writer/ArrowFieldWriters.java     | 117 +++++++++++++++++++++
 .../paimon/arrow/vector/ArrowFormatWriterTest.java |  52 +++++++++
 7 files changed, 271 insertions(+), 11 deletions(-)

diff --git 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java
 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java
index d97df368bd..e6bc1281a7 100644
--- 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java
+++ 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java
@@ -154,7 +154,7 @@ public class ArrowFieldTypeConversion {
 
         @Override
         public FieldType visit(VariantType variantType) {
-            throw new UnsupportedOperationException();
+            return new FieldType(variantType.isNullable(), 
Types.MinorType.STRUCT.getType(), null);
         }
 
         @Override
diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
index 952223b6c9..7b387b409e 100644
--- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
+++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
@@ -22,12 +22,14 @@ import org.apache.paimon.arrow.vector.ArrowCStruct;
 import org.apache.paimon.arrow.writer.ArrowFieldWriter;
 import org.apache.paimon.arrow.writer.ArrowFieldWriterFactoryVisitor;
 import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.Variant;
 import org.apache.paimon.table.SpecialFields;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VariantType;
 
 import org.apache.arrow.c.ArrowArray;
 import org.apache.arrow.c.ArrowSchema;
@@ -219,6 +221,17 @@ public class ArrowUtils {
                             Arrays.asList(keyField, valueField));
 
             children = Collections.singletonList(mapField);
+        } else if (dataType instanceof VariantType) {
+            children =
+                    Arrays.asList(
+                            new Field(
+                                    Variant.VALUE,
+                                    new FieldType(false, 
Types.MinorType.VARBINARY.getType(), null),
+                                    null),
+                            new Field(
+                                    Variant.METADATA,
+                                    new FieldType(false, 
Types.MinorType.VARBINARY.getType(), null),
+                                    null));
         } else if (dataType instanceof RowType) {
             RowType rowType = (RowType) dataType;
             children = new ArrayList<>();
diff --git 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
index 370823e04c..ddb79174ce 100644
--- 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
+++ 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
@@ -47,8 +47,15 @@ public class ArrowFormatCWriter implements AutoCloseable {
             RowType rowType,
             int writeBatchSize,
             boolean caseSensitive,
-            @Nullable Long memoryUsedMaxInVSR) {
-        this(new ArrowFormatWriter(rowType, writeBatchSize, caseSensitive, 
memoryUsedMaxInVSR));
+            @Nullable Long memoryUsedMaxInVSR,
+            @Nullable RowType shreddingSchemas) {
+        this(
+                new ArrowFormatWriter(
+                        rowType,
+                        writeBatchSize,
+                        caseSensitive,
+                        memoryUsedMaxInVSR,
+                        shreddingSchemas));
     }
 
     public ArrowFormatCWriter(
diff --git 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
index 0c507161ff..e9ac280ec3 100644
--- 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
+++ 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
@@ -22,9 +22,12 @@ import org.apache.paimon.arrow.ArrowFieldTypeConversion;
 import org.apache.paimon.arrow.ArrowUtils;
 import org.apache.paimon.arrow.writer.ArrowFieldWriter;
 import org.apache.paimon.arrow.writer.ArrowFieldWriterFactoryVisitor;
+import org.apache.paimon.arrow.writer.ArrowFieldWriters;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VariantType;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
@@ -36,6 +39,9 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /** Write from {@link InternalRow} to {@link VectorSchemaRoot}. */
 public class ArrowFormatWriter implements AutoCloseable {
 
@@ -49,7 +55,7 @@ public class ArrowFormatWriter implements AutoCloseable {
     private int rowId;
 
     public ArrowFormatWriter(RowType rowType, int writeBatchSize, boolean 
caseSensitive) {
-        this(rowType, writeBatchSize, caseSensitive, new RootAllocator(), 
null);
+        this(rowType, writeBatchSize, caseSensitive, new RootAllocator(), 
null, null);
     }
 
     public ArrowFormatWriter(
@@ -57,7 +63,13 @@ public class ArrowFormatWriter implements AutoCloseable {
             int writeBatchSize,
             boolean caseSensitive,
             @Nullable Long memoryUsedMaxInBytes) {
-        this(rowType, writeBatchSize, caseSensitive, new RootAllocator(), 
memoryUsedMaxInBytes);
+        this(
+                rowType,
+                writeBatchSize,
+                caseSensitive,
+                new RootAllocator(),
+                memoryUsedMaxInBytes,
+                null);
     }
 
     public ArrowFormatWriter(
@@ -66,12 +78,38 @@ public class ArrowFormatWriter implements AutoCloseable {
             boolean caseSensitive,
             BufferAllocator allocator,
             @Nullable Long memoryUsedMaxInBytes) {
+        this(rowType, writeBatchSize, caseSensitive, allocator, 
memoryUsedMaxInBytes, null);
+    }
+
+    public ArrowFormatWriter(
+            RowType rowType,
+            int writeBatchSize,
+            boolean caseSensitive,
+            @Nullable Long memoryUsedMaxInBytes,
+            @Nullable RowType shreddingSchemas) {
         this(
                 rowType,
                 writeBatchSize,
                 caseSensitive,
                 new RootAllocator(),
                 memoryUsedMaxInBytes,
+                shreddingSchemas);
+    }
+
+    public ArrowFormatWriter(
+            RowType rowType,
+            int writeBatchSize,
+            boolean caseSensitive,
+            BufferAllocator allocator,
+            @Nullable Long memoryUsedMaxInBytes,
+            @Nullable RowType shreddingSchemas) {
+        this(
+                rowType,
+                writeBatchSize,
+                caseSensitive,
+                allocator,
+                memoryUsedMaxInBytes,
+                shreddingSchemas,
                 ArrowFieldTypeConversion.ARROW_FIELD_TYPE_VISITOR,
                 ArrowFieldWriterFactoryVisitor.INSTANCE);
     }
@@ -82,21 +120,34 @@ public class ArrowFormatWriter implements AutoCloseable {
             boolean caseSensitive,
             BufferAllocator allocator,
             @Nullable Long memoryUsedMaxInBytes,
+            @Nullable RowType shreddingSchemas,
             ArrowFieldTypeConversion.ArrowFieldTypeVisitor fieldTypeVisitor,
             ArrowFieldWriterFactoryVisitor fieldWriterFactory) {
         this.allocator = allocator;
 
+        RowType outputRowType = replaceWithShreddingType(rowType, 
shreddingSchemas);
         vectorSchemaRoot =
                 ArrowUtils.createVectorSchemaRoot(
-                        rowType, allocator, caseSensitive, fieldTypeVisitor);
+                        outputRowType, allocator, caseSensitive, 
fieldTypeVisitor);
 
         fieldWriters = new ArrowFieldWriter[rowType.getFieldCount()];
 
         for (int i = 0; i < fieldWriters.length; i++) {
-            DataType type = rowType.getFields().get(i).type();
-            fieldWriters[i] =
-                    type.accept(fieldWriterFactory)
-                            .create(vectorSchemaRoot.getVector(i), 
type.isNullable());
+            DataField field = rowType.getFields().get(i);
+            DataType type = field.type();
+            if (type instanceof VariantType) {
+                RowType shreddingSchema =
+                        shreddingSchemas != null && 
shreddingSchemas.containsField(field.name())
+                                ? (RowType) 
shreddingSchemas.getField(field.name()).type()
+                                : null;
+                fieldWriters[i] =
+                        new ArrowFieldWriters.VariantWriter(
+                                vectorSchemaRoot.getVector(i), 
type.isNullable(), shreddingSchema);
+            } else {
+                fieldWriters[i] =
+                        type.accept(fieldWriterFactory)
+                                .create(vectorSchemaRoot.getVector(i), 
type.isNullable());
+            }
         }
 
         this.batchSize = writeBatchSize;
@@ -169,4 +220,23 @@ public class ArrowFormatWriter implements AutoCloseable {
     public BufferAllocator getAllocator() {
         return allocator;
     }
+
+    private static RowType replaceWithShreddingType(
+            RowType rowType, @Nullable RowType shreddingSchemas) {
+        if (shreddingSchemas == null) {
+            return rowType;
+        }
+
+        List<DataField> newFields = new ArrayList<>();
+        for (DataField field : rowType.getFields()) {
+            if (field.type() instanceof VariantType
+                    && shreddingSchemas.containsField(field.name())) {
+                RowType shreddingSchema = (RowType) 
shreddingSchemas.getField(field.name()).type();
+                newFields.add(field.newType(shreddingSchema));
+            } else {
+                newFields.add(field);
+            }
+        }
+        return new RowType(rowType.isNullable(), newFields);
+    }
 }
diff --git 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java
 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java
index 3c22ef8bb5..a20e6fc481 100644
--- 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java
+++ 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java
@@ -145,7 +145,8 @@ public class ArrowFieldWriterFactoryVisitor implements 
DataTypeVisitor<ArrowFiel
 
     @Override
     public ArrowFieldWriterFactory visit(VariantType variantType) {
-        throw new UnsupportedOperationException("Doesn't support 
VariantType.");
+        return (fieldVector, isNullable) ->
+                new ArrowFieldWriters.VariantWriter(fieldVector, isNullable, 
null);
     }
 
     @Override
diff --git 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java
 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java
index 9e4f371a79..409b1f9ab2 100644
--- 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java
+++ 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java
@@ -21,6 +21,7 @@ package org.apache.paimon.arrow.writer;
 import org.apache.paimon.arrow.ArrowUtils;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.DataGetters;
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
@@ -40,7 +41,14 @@ import org.apache.paimon.data.columnar.RowColumnVector;
 import org.apache.paimon.data.columnar.ShortColumnVector;
 import org.apache.paimon.data.columnar.TimestampColumnVector;
 import org.apache.paimon.data.columnar.VectorizedColumnBatch;
+import org.apache.paimon.data.variant.GenericVariant;
+import org.apache.paimon.data.variant.PaimonShreddingUtils;
+import org.apache.paimon.data.variant.Variant;
+import org.apache.paimon.data.variant.VariantSchema;
 import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.IntArrayList;
 
 import org.apache.arrow.vector.BigIntVector;
@@ -65,10 +73,18 @@ import javax.annotation.Nullable;
 
 import java.math.BigDecimal;
 import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.List;
 
 /** Registry of {@link ArrowFieldWriter}s. */
 public class ArrowFieldWriters {
 
+    private static final RowType DEFAULT_VARIANT_ROW_TYPE =
+            new RowType(
+                    Arrays.asList(
+                            new DataField(0, Variant.VALUE, 
DataTypes.BYTES().notNull()),
+                            new DataField(1, Variant.METADATA, 
DataTypes.BYTES().notNull())));
+
     /** Writer for CHAR & VARCHAR. */
     public static class StringWriter extends ArrowFieldWriter {
 
@@ -511,6 +527,107 @@ public class ArrowFieldWriters {
         }
     }
 
+    /** Writer for VARIANT. */
+    public static class VariantWriter extends ArrowFieldWriter {
+
+        @Nullable private final VariantSchema variantSchema;
+        @Nullable private final GenericRow reusableRow;
+        private final ArrowFieldWriter[] fieldWriters;
+        private final StructVector structVector;
+        private final int fieldCount;
+
+        public VariantWriter(
+                FieldVector fieldVector, boolean isNullable, @Nullable RowType 
shreddingSchema) {
+            super(fieldVector, isNullable);
+            this.structVector = (StructVector) fieldVector;
+
+            RowType outputRowType;
+            if (shreddingSchema != null) {
+                this.variantSchema = 
PaimonShreddingUtils.buildVariantSchema(shreddingSchema);
+                this.reusableRow = null;
+                outputRowType = shreddingSchema;
+            } else {
+                this.variantSchema = null;
+                this.reusableRow = new 
GenericRow(DEFAULT_VARIANT_ROW_TYPE.getFieldCount());
+                outputRowType = DEFAULT_VARIANT_ROW_TYPE;
+            }
+
+            this.fieldCount = outputRowType.getFieldCount();
+            this.fieldWriters = new ArrowFieldWriter[fieldCount];
+            List<FieldVector> children = structVector.getChildrenFromFields();
+            for (int i = 0; i < fieldCount; i++) {
+                fieldWriters[i] =
+                        outputRowType
+                                .getTypeAt(i)
+                                
.accept(ArrowFieldWriterFactoryVisitor.INSTANCE)
+                                .create(children.get(i), 
outputRowType.getTypeAt(i).isNullable());
+            }
+        }
+
+        @Override
+        public void reset() {
+            super.reset();
+            for (ArrowFieldWriter fieldWriter : fieldWriters) {
+                fieldWriter.reset();
+            }
+        }
+
+        @Override
+        protected void doWrite(
+                ColumnVector columnVector,
+                @Nullable int[] pickedInColumn,
+                int startIndex,
+                int batchRows) {
+            RowColumnVector rowColumnVector = (RowColumnVector) columnVector;
+            for (int i = 0; i < batchRows; i++) {
+                int row = getRowNumber(startIndex, i, pickedInColumn);
+                if (columnVector.isNullAt(row)) {
+                    structVector.setNull(i);
+                    continue;
+                }
+
+                InternalRow rowData = rowColumnVector.getRow(row);
+                if (variantSchema != null && rowData.getFieldCount() != 
fieldCount) {
+                    GenericVariant variant =
+                            new GenericVariant(rowData.getBinary(0), 
rowData.getBinary(1));
+                    InternalRow shreddedRow =
+                            PaimonShreddingUtils.castShredded(variant, 
variantSchema);
+                    writeRow(i, shreddedRow);
+                } else {
+                    writeRow(i, rowData);
+                }
+            }
+        }
+
+        @Override
+        protected void doWrite(int rowIndex, DataGetters getters, int pos) {
+            Variant variant = getters.getVariant(pos);
+            if (variantSchema != null) {
+                GenericVariant genericVariant =
+                        variant instanceof GenericVariant
+                                ? (GenericVariant) variant
+                                : new GenericVariant(variant.value(), 
variant.metadata());
+                InternalRow shreddedRow =
+                        PaimonShreddingUtils.castShredded(genericVariant, 
variantSchema);
+                writeRow(rowIndex, shreddedRow);
+            } else {
+                if (reusableRow == null) {
+                    throw new IllegalStateException("Reusable row is not 
initialized.");
+                }
+                reusableRow.setField(0, variant.value());
+                reusableRow.setField(1, variant.metadata());
+                writeRow(rowIndex, reusableRow);
+            }
+        }
+
+        private void writeRow(int rowIndex, InternalRow rowData) {
+            for (int i = 0; i < fieldCount; i++) {
+                fieldWriters[i].write(rowIndex, rowData, i);
+            }
+            structVector.setIndexDefined(rowIndex);
+        }
+    }
+
     /** Writer for ARRAY. */
     public static class ArrayWriter extends ArrowFieldWriter {
 
diff --git 
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
 
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
index 63b7155474..ec1fe10c46 100644
--- 
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
+++ 
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
@@ -30,6 +30,9 @@ import org.apache.paimon.data.GenericMap;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.GenericVariant;
+import org.apache.paimon.data.variant.PaimonShreddingUtils;
+import org.apache.paimon.data.variant.Variant;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
@@ -40,6 +43,7 @@ import org.apache.arrow.memory.OutOfMemoryException;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.complex.ListVector;
@@ -136,6 +140,53 @@ public class ArrowFormatWriterTest {
         }
     }
 
+    @Test
+    public void testWriteVariant() {
+        RowType rowType = new RowType(Arrays.asList(new DataField(0, "v", 
DataTypes.VARIANT())));
+        GenericVariant variant = GenericVariant.fromJson("{\"a\": 1, \"b\": 
\"x\"}");
+        try (ArrowFormatWriter writer = new ArrowFormatWriter(rowType, 16, 
true)) {
+            writer.write(GenericRow.of(variant));
+            writer.flush();
+
+            StructVector variantVector = (StructVector) 
writer.getVectorSchemaRoot().getVector("v");
+            assertThat(variantVector.isNull(0)).isFalse();
+            VarBinaryVector valueVector = (VarBinaryVector) 
variantVector.getChild(Variant.VALUE);
+            VarBinaryVector metadataVector =
+                    (VarBinaryVector) variantVector.getChild(Variant.METADATA);
+            assertThat(valueVector.getObject(0)).isEqualTo(variant.value());
+            
assertThat(metadataVector.getObject(0)).isEqualTo(variant.metadata());
+        }
+    }
+
+    @Test
+    public void testWriteVariantWithShreddingSchema() {
+        RowType rowType = new RowType(Arrays.asList(new DataField(0, "v", 
DataTypes.VARIANT())));
+        RowType expectedSchema =
+                DataTypes.ROW(
+                        DataTypes.FIELD(0, "a", DataTypes.INT()),
+                        DataTypes.FIELD(1, "b", DataTypes.STRING()));
+        RowType shreddingSchemas =
+                new RowType(
+                        Arrays.asList(
+                                new DataField(
+                                        0,
+                                        "v",
+                                        
PaimonShreddingUtils.variantShreddingSchema(
+                                                expectedSchema))));
+        GenericVariant variant = GenericVariant.fromJson("{\"a\": 1, \"b\": 
\"x\"}");
+
+        try (ArrowFormatWriter writer =
+                new ArrowFormatWriter(rowType, 16, true, null, 
shreddingSchemas)) {
+            writer.write(GenericRow.of(variant));
+            writer.flush();
+
+            StructVector variantVector = (StructVector) 
writer.getVectorSchemaRoot().getVector("v");
+            assertThat(variantVector.isNull(0)).isFalse();
+            
assertThat(variantVector.getChild(PaimonShreddingUtils.TYPED_VALUE_FIELD_NAME))
+                    .isNotNull();
+        }
+    }
+
     @Test
     public void testReadWithSchemaMessUp() {
         try (ArrowFormatWriter writer = new ArrowFormatWriter(PRIMITIVE_TYPE, 
4096, true)) {
@@ -346,6 +397,7 @@ public class ArrowFormatWriterTest {
                                     true,
                                     allocator,
                                     null,
+                                    null,
                                     customFieldTypeVisitor,
                                     customFieldWriterVisitor))) {
                 writeAndCheckCustom(writer, customConverterVisitor);

Reply via email to