This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 40132ee3eb [arrow] Support arrow variant/shredding writer. (#7082)
40132ee3eb is described below
commit 40132ee3eb9e7e5bf7f6a5584967919c4d1bf73b
Author: YeJunHao <[email protected]>
AuthorDate: Tue Jan 20 15:04:20 2026 +0800
[arrow] Support arrow variant/shredding writer. (#7082)
---
.../paimon/arrow/ArrowFieldTypeConversion.java | 2 +-
.../java/org/apache/paimon/arrow/ArrowUtils.java | 13 +++
.../paimon/arrow/vector/ArrowFormatCWriter.java | 11 +-
.../paimon/arrow/vector/ArrowFormatWriter.java | 84 +++++++++++++--
.../writer/ArrowFieldWriterFactoryVisitor.java | 3 +-
.../paimon/arrow/writer/ArrowFieldWriters.java | 117 +++++++++++++++++++++
.../paimon/arrow/vector/ArrowFormatWriterTest.java | 52 +++++++++
7 files changed, 271 insertions(+), 11 deletions(-)
diff --git
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java
index d97df368bd..e6bc1281a7 100644
---
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java
+++
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java
@@ -154,7 +154,7 @@ public class ArrowFieldTypeConversion {
@Override
public FieldType visit(VariantType variantType) {
- throw new UnsupportedOperationException();
+ return new FieldType(variantType.isNullable(),
Types.MinorType.STRUCT.getType(), null);
}
@Override
diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
index 952223b6c9..7b387b409e 100644
--- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
+++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
@@ -22,12 +22,14 @@ import org.apache.paimon.arrow.vector.ArrowCStruct;
import org.apache.paimon.arrow.writer.ArrowFieldWriter;
import org.apache.paimon.arrow.writer.ArrowFieldWriterFactoryVisitor;
import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VariantType;
import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;
@@ -219,6 +221,17 @@ public class ArrowUtils {
Arrays.asList(keyField, valueField));
children = Collections.singletonList(mapField);
+ } else if (dataType instanceof VariantType) {
+ children =
+ Arrays.asList(
+ new Field(
+ Variant.VALUE,
+ new FieldType(false,
Types.MinorType.VARBINARY.getType(), null),
+ null),
+ new Field(
+ Variant.METADATA,
+ new FieldType(false,
Types.MinorType.VARBINARY.getType(), null),
+ null));
} else if (dataType instanceof RowType) {
RowType rowType = (RowType) dataType;
children = new ArrayList<>();
diff --git
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
index 370823e04c..ddb79174ce 100644
---
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
+++
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
@@ -47,8 +47,15 @@ public class ArrowFormatCWriter implements AutoCloseable {
RowType rowType,
int writeBatchSize,
boolean caseSensitive,
- @Nullable Long memoryUsedMaxInVSR) {
- this(new ArrowFormatWriter(rowType, writeBatchSize, caseSensitive,
memoryUsedMaxInVSR));
+ @Nullable Long memoryUsedMaxInVSR,
+ @Nullable RowType shreddingSchemas) {
+ this(
+ new ArrowFormatWriter(
+ rowType,
+ writeBatchSize,
+ caseSensitive,
+ memoryUsedMaxInVSR,
+ shreddingSchemas));
}
public ArrowFormatCWriter(
diff --git
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
index 0c507161ff..e9ac280ec3 100644
---
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
+++
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
@@ -22,9 +22,12 @@ import org.apache.paimon.arrow.ArrowFieldTypeConversion;
import org.apache.paimon.arrow.ArrowUtils;
import org.apache.paimon.arrow.writer.ArrowFieldWriter;
import org.apache.paimon.arrow.writer.ArrowFieldWriterFactoryVisitor;
+import org.apache.paimon.arrow.writer.ArrowFieldWriters;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VariantType;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
@@ -36,6 +39,9 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+
/** Write from {@link InternalRow} to {@link VectorSchemaRoot}. */
public class ArrowFormatWriter implements AutoCloseable {
@@ -49,7 +55,7 @@ public class ArrowFormatWriter implements AutoCloseable {
private int rowId;
public ArrowFormatWriter(RowType rowType, int writeBatchSize, boolean
caseSensitive) {
- this(rowType, writeBatchSize, caseSensitive, new RootAllocator(),
null);
+ this(rowType, writeBatchSize, caseSensitive, new RootAllocator(),
null, null);
}
public ArrowFormatWriter(
@@ -57,7 +63,13 @@ public class ArrowFormatWriter implements AutoCloseable {
int writeBatchSize,
boolean caseSensitive,
@Nullable Long memoryUsedMaxInBytes) {
- this(rowType, writeBatchSize, caseSensitive, new RootAllocator(),
memoryUsedMaxInBytes);
+ this(
+ rowType,
+ writeBatchSize,
+ caseSensitive,
+ new RootAllocator(),
+ memoryUsedMaxInBytes,
+ null);
}
public ArrowFormatWriter(
@@ -66,12 +78,38 @@ public class ArrowFormatWriter implements AutoCloseable {
boolean caseSensitive,
BufferAllocator allocator,
@Nullable Long memoryUsedMaxInBytes) {
+ this(rowType, writeBatchSize, caseSensitive, allocator,
memoryUsedMaxInBytes, null);
+ }
+
+ public ArrowFormatWriter(
+ RowType rowType,
+ int writeBatchSize,
+ boolean caseSensitive,
+ @Nullable Long memoryUsedMaxInBytes,
+ @Nullable RowType shreddingSchemas) {
this(
rowType,
writeBatchSize,
caseSensitive,
new RootAllocator(),
memoryUsedMaxInBytes,
+ shreddingSchemas);
+ }
+
+ public ArrowFormatWriter(
+ RowType rowType,
+ int writeBatchSize,
+ boolean caseSensitive,
+ BufferAllocator allocator,
+ @Nullable Long memoryUsedMaxInBytes,
+ @Nullable RowType shreddingSchemas) {
+ this(
+ rowType,
+ writeBatchSize,
+ caseSensitive,
+ allocator,
+ memoryUsedMaxInBytes,
+ shreddingSchemas,
ArrowFieldTypeConversion.ARROW_FIELD_TYPE_VISITOR,
ArrowFieldWriterFactoryVisitor.INSTANCE);
}
@@ -82,21 +120,34 @@ public class ArrowFormatWriter implements AutoCloseable {
boolean caseSensitive,
BufferAllocator allocator,
@Nullable Long memoryUsedMaxInBytes,
+ @Nullable RowType shreddingSchemas,
ArrowFieldTypeConversion.ArrowFieldTypeVisitor fieldTypeVisitor,
ArrowFieldWriterFactoryVisitor fieldWriterFactory) {
this.allocator = allocator;
+ RowType outputRowType = replaceWithShreddingType(rowType,
shreddingSchemas);
vectorSchemaRoot =
ArrowUtils.createVectorSchemaRoot(
- rowType, allocator, caseSensitive, fieldTypeVisitor);
+ outputRowType, allocator, caseSensitive,
fieldTypeVisitor);
fieldWriters = new ArrowFieldWriter[rowType.getFieldCount()];
for (int i = 0; i < fieldWriters.length; i++) {
- DataType type = rowType.getFields().get(i).type();
- fieldWriters[i] =
- type.accept(fieldWriterFactory)
- .create(vectorSchemaRoot.getVector(i),
type.isNullable());
+ DataField field = rowType.getFields().get(i);
+ DataType type = field.type();
+ if (type instanceof VariantType) {
+ RowType shreddingSchema =
+ shreddingSchemas != null &&
shreddingSchemas.containsField(field.name())
+ ? (RowType)
shreddingSchemas.getField(field.name()).type()
+ : null;
+ fieldWriters[i] =
+ new ArrowFieldWriters.VariantWriter(
+ vectorSchemaRoot.getVector(i),
type.isNullable(), shreddingSchema);
+ } else {
+ fieldWriters[i] =
+ type.accept(fieldWriterFactory)
+ .create(vectorSchemaRoot.getVector(i),
type.isNullable());
+ }
}
this.batchSize = writeBatchSize;
@@ -169,4 +220,23 @@ public class ArrowFormatWriter implements AutoCloseable {
public BufferAllocator getAllocator() {
return allocator;
}
+
+ private static RowType replaceWithShreddingType(
+ RowType rowType, @Nullable RowType shreddingSchemas) {
+ if (shreddingSchemas == null) {
+ return rowType;
+ }
+
+ List<DataField> newFields = new ArrayList<>();
+ for (DataField field : rowType.getFields()) {
+ if (field.type() instanceof VariantType
+ && shreddingSchemas.containsField(field.name())) {
+ RowType shreddingSchema = (RowType)
shreddingSchemas.getField(field.name()).type();
+ newFields.add(field.newType(shreddingSchema));
+ } else {
+ newFields.add(field);
+ }
+ }
+ return new RowType(rowType.isNullable(), newFields);
+ }
}
diff --git
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java
index 3c22ef8bb5..a20e6fc481 100644
---
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java
+++
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java
@@ -145,7 +145,8 @@ public class ArrowFieldWriterFactoryVisitor implements
DataTypeVisitor<ArrowFiel
@Override
public ArrowFieldWriterFactory visit(VariantType variantType) {
- throw new UnsupportedOperationException("Doesn't support
VariantType.");
+ return (fieldVector, isNullable) ->
+ new ArrowFieldWriters.VariantWriter(fieldVector, isNullable,
null);
}
@Override
diff --git
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java
index 9e4f371a79..409b1f9ab2 100644
---
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java
+++
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java
@@ -21,6 +21,7 @@ package org.apache.paimon.arrow.writer;
import org.apache.paimon.arrow.ArrowUtils;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.DataGetters;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
@@ -40,7 +41,14 @@ import org.apache.paimon.data.columnar.RowColumnVector;
import org.apache.paimon.data.columnar.ShortColumnVector;
import org.apache.paimon.data.columnar.TimestampColumnVector;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
+import org.apache.paimon.data.variant.GenericVariant;
+import org.apache.paimon.data.variant.PaimonShreddingUtils;
+import org.apache.paimon.data.variant.Variant;
+import org.apache.paimon.data.variant.VariantSchema;
import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.IntArrayList;
import org.apache.arrow.vector.BigIntVector;
@@ -65,10 +73,18 @@ import javax.annotation.Nullable;
import java.math.BigDecimal;
import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.List;
/** Registry of {@link ArrowFieldWriter}s. */
public class ArrowFieldWriters {
+ private static final RowType DEFAULT_VARIANT_ROW_TYPE =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, Variant.VALUE,
DataTypes.BYTES().notNull()),
+ new DataField(1, Variant.METADATA,
DataTypes.BYTES().notNull())));
+
/** Writer for CHAR & VARCHAR. */
public static class StringWriter extends ArrowFieldWriter {
@@ -511,6 +527,107 @@ public class ArrowFieldWriters {
}
}
+ /** Writer for VARIANT. */
+ public static class VariantWriter extends ArrowFieldWriter {
+
+ @Nullable private final VariantSchema variantSchema;
+ @Nullable private final GenericRow reusableRow;
+ private final ArrowFieldWriter[] fieldWriters;
+ private final StructVector structVector;
+ private final int fieldCount;
+
+ public VariantWriter(
+ FieldVector fieldVector, boolean isNullable, @Nullable RowType
shreddingSchema) {
+ super(fieldVector, isNullable);
+ this.structVector = (StructVector) fieldVector;
+
+ RowType outputRowType;
+ if (shreddingSchema != null) {
+ this.variantSchema =
PaimonShreddingUtils.buildVariantSchema(shreddingSchema);
+ this.reusableRow = null;
+ outputRowType = shreddingSchema;
+ } else {
+ this.variantSchema = null;
+ this.reusableRow = new
GenericRow(DEFAULT_VARIANT_ROW_TYPE.getFieldCount());
+ outputRowType = DEFAULT_VARIANT_ROW_TYPE;
+ }
+
+ this.fieldCount = outputRowType.getFieldCount();
+ this.fieldWriters = new ArrowFieldWriter[fieldCount];
+ List<FieldVector> children = structVector.getChildrenFromFields();
+ for (int i = 0; i < fieldCount; i++) {
+ fieldWriters[i] =
+ outputRowType
+ .getTypeAt(i)
+
.accept(ArrowFieldWriterFactoryVisitor.INSTANCE)
+ .create(children.get(i),
outputRowType.getTypeAt(i).isNullable());
+ }
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ for (ArrowFieldWriter fieldWriter : fieldWriters) {
+ fieldWriter.reset();
+ }
+ }
+
+ @Override
+ protected void doWrite(
+ ColumnVector columnVector,
+ @Nullable int[] pickedInColumn,
+ int startIndex,
+ int batchRows) {
+ RowColumnVector rowColumnVector = (RowColumnVector) columnVector;
+ for (int i = 0; i < batchRows; i++) {
+ int row = getRowNumber(startIndex, i, pickedInColumn);
+ if (columnVector.isNullAt(row)) {
+ structVector.setNull(i);
+ continue;
+ }
+
+ InternalRow rowData = rowColumnVector.getRow(row);
+ if (variantSchema != null && rowData.getFieldCount() !=
fieldCount) {
+ GenericVariant variant =
+ new GenericVariant(rowData.getBinary(0),
rowData.getBinary(1));
+ InternalRow shreddedRow =
+ PaimonShreddingUtils.castShredded(variant,
variantSchema);
+ writeRow(i, shreddedRow);
+ } else {
+ writeRow(i, rowData);
+ }
+ }
+ }
+
+ @Override
+ protected void doWrite(int rowIndex, DataGetters getters, int pos) {
+ Variant variant = getters.getVariant(pos);
+ if (variantSchema != null) {
+ GenericVariant genericVariant =
+ variant instanceof GenericVariant
+ ? (GenericVariant) variant
+ : new GenericVariant(variant.value(),
variant.metadata());
+ InternalRow shreddedRow =
+ PaimonShreddingUtils.castShredded(genericVariant,
variantSchema);
+ writeRow(rowIndex, shreddedRow);
+ } else {
+ if (reusableRow == null) {
+ throw new IllegalStateException("Reusable row is not
initialized.");
+ }
+ reusableRow.setField(0, variant.value());
+ reusableRow.setField(1, variant.metadata());
+ writeRow(rowIndex, reusableRow);
+ }
+ }
+
+ private void writeRow(int rowIndex, InternalRow rowData) {
+ for (int i = 0; i < fieldCount; i++) {
+ fieldWriters[i].write(rowIndex, rowData, i);
+ }
+ structVector.setIndexDefined(rowIndex);
+ }
+ }
+
/** Writer for ARRAY. */
public static class ArrayWriter extends ArrowFieldWriter {
diff --git
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
index 63b7155474..ec1fe10c46 100644
---
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
+++
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
@@ -30,6 +30,9 @@ import org.apache.paimon.data.GenericMap;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.GenericVariant;
+import org.apache.paimon.data.variant.PaimonShreddingUtils;
+import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -40,6 +43,7 @@ import org.apache.arrow.memory.OutOfMemoryException;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
@@ -136,6 +140,53 @@ public class ArrowFormatWriterTest {
}
}
+ @Test
+ public void testWriteVariant() {
+ RowType rowType = new RowType(Arrays.asList(new DataField(0, "v",
DataTypes.VARIANT())));
+ GenericVariant variant = GenericVariant.fromJson("{\"a\": 1, \"b\":
\"x\"}");
+ try (ArrowFormatWriter writer = new ArrowFormatWriter(rowType, 16,
true)) {
+ writer.write(GenericRow.of(variant));
+ writer.flush();
+
+ StructVector variantVector = (StructVector)
writer.getVectorSchemaRoot().getVector("v");
+ assertThat(variantVector.isNull(0)).isFalse();
+ VarBinaryVector valueVector = (VarBinaryVector)
variantVector.getChild(Variant.VALUE);
+ VarBinaryVector metadataVector =
+ (VarBinaryVector) variantVector.getChild(Variant.METADATA);
+ assertThat(valueVector.getObject(0)).isEqualTo(variant.value());
+
assertThat(metadataVector.getObject(0)).isEqualTo(variant.metadata());
+ }
+ }
+
+ @Test
+ public void testWriteVariantWithShreddingSchema() {
+ RowType rowType = new RowType(Arrays.asList(new DataField(0, "v",
DataTypes.VARIANT())));
+ RowType expectedSchema =
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "a", DataTypes.INT()),
+ DataTypes.FIELD(1, "b", DataTypes.STRING()));
+ RowType shreddingSchemas =
+ new RowType(
+ Arrays.asList(
+ new DataField(
+ 0,
+ "v",
+
PaimonShreddingUtils.variantShreddingSchema(
+ expectedSchema))));
+ GenericVariant variant = GenericVariant.fromJson("{\"a\": 1, \"b\":
\"x\"}");
+
+ try (ArrowFormatWriter writer =
+ new ArrowFormatWriter(rowType, 16, true, null,
shreddingSchemas)) {
+ writer.write(GenericRow.of(variant));
+ writer.flush();
+
+ StructVector variantVector = (StructVector)
writer.getVectorSchemaRoot().getVector("v");
+ assertThat(variantVector.isNull(0)).isFalse();
+
assertThat(variantVector.getChild(PaimonShreddingUtils.TYPED_VALUE_FIELD_NAME))
+ .isNotNull();
+ }
+ }
+
@Test
public void testReadWithSchemaMessUp() {
try (ArrowFormatWriter writer = new ArrowFormatWriter(PRIMITIVE_TYPE,
4096, true)) {
@@ -346,6 +397,7 @@ public class ArrowFormatWriterTest {
true,
allocator,
null,
+ null,
customFieldTypeVisitor,
customFieldWriterVisitor))) {
writeAndCheckCustom(writer, customConverterVisitor);