This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 080a3ef89f [core] Introduce RowToColumnConverter (#6673)
080a3ef89f is described below

commit 080a3ef89f9fd9dc684cd8c357b7fe9c6cde107f
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Nov 26 10:17:35 2025 +0800

    [core] Introduce RowToColumnConverter (#6673)
---
 .../paimon/data/columnar/RowToColumnConverter.java | 373 +++++++++++++++++
 .../columnar/heap/AbstractArrayBasedVector.java    |  16 +
 .../data/columnar/heap/HeapBooleanVector.java      |   7 +
 .../paimon/data/columnar/heap/HeapByteVector.java  |   7 +
 .../paimon/data/columnar/heap/HeapBytesVector.java |   7 +
 .../data/columnar/heap/HeapDoubleVector.java       |   7 +
 .../paimon/data/columnar/heap/HeapFloatVector.java |   7 +
 .../paimon/data/columnar/heap/HeapLongVector.java  |   7 +
 .../paimon/data/columnar/heap/HeapMapVector.java   |   8 +
 .../paimon/data/columnar/heap/HeapRowVector.java   |   5 +
 .../paimon/data/columnar/heap/HeapShortVector.java |   7 +
 .../data/columnar/heap/HeapTimestampVector.java    |   7 +
 .../columnar/writable/WritableBooleanVector.java   |   2 +
 .../data/columnar/writable/WritableByteVector.java |   2 +
 .../columnar/writable/WritableBytesVector.java     |   2 +
 .../columnar/writable/WritableColumnVector.java    |   7 +
 .../columnar/writable/WritableDoubleVector.java    |   2 +
 .../columnar/writable/WritableFloatVector.java     |   2 +
 .../data/columnar/writable/WritableLongVector.java |   2 +
 .../columnar/writable/WritableShortVector.java     |   2 +
 .../columnar/writable/WritableTimestampVector.java |   2 +
 .../data/columnar/RowToColumnConverterTest.java    | 461 +++++++++++++++++++++
 .../parquet/reader/ParquetDecimalVector.java       |  14 +
 23 files changed, 956 insertions(+)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/RowToColumnConverter.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/RowToColumnConverter.java
new file mode 100644
index 0000000000..d2a378846c
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/RowToColumnConverter.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.columnar;
+
+import org.apache.paimon.data.DataGetters;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.columnar.heap.HeapArrayVector;
+import org.apache.paimon.data.columnar.heap.HeapMapVector;
+import org.apache.paimon.data.columnar.heap.HeapRowVector;
+import org.apache.paimon.data.columnar.writable.WritableBooleanVector;
+import org.apache.paimon.data.columnar.writable.WritableByteVector;
+import org.apache.paimon.data.columnar.writable.WritableBytesVector;
+import org.apache.paimon.data.columnar.writable.WritableColumnVector;
+import org.apache.paimon.data.columnar.writable.WritableDoubleVector;
+import org.apache.paimon.data.columnar.writable.WritableFloatVector;
+import org.apache.paimon.data.columnar.writable.WritableIntVector;
+import org.apache.paimon.data.columnar.writable.WritableLongVector;
+import org.apache.paimon.data.columnar.writable.WritableShortVector;
+import org.apache.paimon.data.columnar.writable.WritableTimestampVector;
+import org.apache.paimon.data.variant.Variant;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BlobType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeVisitor;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.types.VariantType;
+
+import java.io.Serializable;
+import java.util.List;
+
+/** Covert row based data to columnar data. */
+public class RowToColumnConverter {
+
+    private final TypeConverter[] converters;
+
+    public RowToColumnConverter(RowType rowType) {
+        List<DataType> fieldTypes = rowType.getFieldTypes();
+        this.converters = new TypeConverter[fieldTypes.size()];
+        for (int i = 0; i < fieldTypes.size(); i++) {
+            converters[i] = 
TypeConverter.getConverterForType(fieldTypes.get(i));
+        }
+    }
+
+    public void convert(InternalRow row, WritableColumnVector[] vectors) {
+        for (int i = 0; i < row.getFieldCount(); i++) {
+            converters[i].append(row, i, vectors[i]);
+        }
+    }
+
+    private interface TypeConverter extends Serializable {
+
+        void append(DataGetters row, int column, WritableColumnVector cv);
+
+        static TypeConverter getConverterForType(DataType dataType) {
+            return dataType.accept(TypeConverterVisitor.INSTANCE);
+        }
+
+        class TypeConverterVisitor implements DataTypeVisitor<TypeConverter> {
+
+            static final TypeConverterVisitor INSTANCE = new 
TypeConverterVisitor();
+
+            @FunctionalInterface
+            interface ValueWriter {
+                void write(DataGetters row, int column, WritableColumnVector 
cv);
+            }
+
+            @Override
+            public TypeConverter visit(CharType charType) {
+                return createConverter(
+                        charType.isNullable(),
+                        (row, column, cv) ->
+                                ((WritableByteVector) 
cv).appendByte(row.getByte(column)));
+            }
+
+            @Override
+            public TypeConverter visit(VarCharType varCharType) {
+                return createConverter(
+                        varCharType.isNullable(),
+                        (row, column, cv) -> {
+                            byte[] bytes = row.getString(column).toBytes();
+                            ((WritableBytesVector) cv).appendByteArray(bytes, 
0, bytes.length);
+                        });
+            }
+
+            @Override
+            public TypeConverter visit(BooleanType booleanType) {
+                return createConverter(
+                        booleanType.isNullable(),
+                        (row, column, cv) ->
+                                ((WritableBooleanVector) 
cv).appendBoolean(row.getBoolean(column)));
+            }
+
+            @Override
+            public TypeConverter visit(BinaryType binaryType) {
+                return binaryConverter(binaryType.isNullable());
+            }
+
+            @Override
+            public TypeConverter visit(VarBinaryType varBinaryType) {
+                return binaryConverter(varBinaryType.isNullable());
+            }
+
+            @Override
+            public TypeConverter visit(DecimalType decimalType) {
+                return createConverter(
+                        decimalType.isNullable(),
+                        (row, column, cv) -> {
+                            Decimal decimal =
+                                    row.getDecimal(
+                                            column,
+                                            decimalType.getPrecision(),
+                                            decimalType.getScale());
+                            if (cv instanceof WritableIntVector) {
+                                ((WritableIntVector) cv).appendInt((int) 
decimal.toUnscaledLong());
+                            } else if (cv instanceof WritableLongVector) {
+                                ((WritableLongVector) 
cv).appendLong(decimal.toUnscaledLong());
+                            } else if (cv instanceof WritableBytesVector) {
+                                byte[] bytes = decimal.toUnscaledBytes();
+                                ((WritableBytesVector) 
cv).appendByteArray(bytes, 0, bytes.length);
+                            } else {
+                                throw new UnsupportedOperationException(
+                                        "Unsupported column vector: " + cv);
+                            }
+                        });
+            }
+
+            @Override
+            public TypeConverter visit(TinyIntType tinyIntType) {
+                return createConverter(
+                        tinyIntType.isNullable(),
+                        (row, column, cv) ->
+                                ((WritableByteVector) 
cv).appendByte(row.getByte(column)));
+            }
+
+            @Override
+            public TypeConverter visit(SmallIntType smallIntType) {
+                return createConverter(
+                        smallIntType.isNullable(),
+                        (row, column, cv) ->
+                                ((WritableShortVector) 
cv).appendShort(row.getShort(column)));
+            }
+
+            @Override
+            public TypeConverter visit(IntType intType) {
+                return createConverter(
+                        intType.isNullable(),
+                        (row, column, cv) ->
+                                ((WritableIntVector) 
cv).appendInt(row.getInt(column)));
+            }
+
+            @Override
+            public TypeConverter visit(BigIntType bigIntType) {
+                return createConverter(
+                        bigIntType.isNullable(),
+                        (row, column, cv) ->
+                                ((WritableLongVector) 
cv).appendLong(row.getLong(column)));
+            }
+
+            @Override
+            public TypeConverter visit(FloatType floatType) {
+                return createConverter(
+                        floatType.isNullable(),
+                        (row, column, cv) ->
+                                ((WritableFloatVector) 
cv).appendFloat(row.getFloat(column)));
+            }
+
+            @Override
+            public TypeConverter visit(DoubleType doubleType) {
+                return createConverter(
+                        doubleType.isNullable(),
+                        (row, column, cv) ->
+                                ((WritableDoubleVector) 
cv).appendDouble(row.getDouble(column)));
+            }
+
+            @Override
+            public TypeConverter visit(DateType dateType) {
+                return createConverter(
+                        dateType.isNullable(),
+                        (row, column, cv) ->
+                                ((WritableIntVector) 
cv).appendInt(row.getInt(column)));
+            }
+
+            @Override
+            public TypeConverter visit(TimeType timeType) {
+                return createConverter(
+                        timeType.isNullable(),
+                        (row, column, cv) ->
+                                ((WritableIntVector) 
cv).appendInt(row.getInt(column)));
+            }
+
+            @Override
+            public TypeConverter visit(TimestampType timestampType) {
+                return timestampConverter(timestampType.isNullable(), 
timestampType.getPrecision());
+            }
+
+            @Override
+            public TypeConverter visit(LocalZonedTimestampType 
localZonedTimestampType) {
+                return timestampConverter(
+                        localZonedTimestampType.isNullable(),
+                        localZonedTimestampType.getPrecision());
+            }
+
+            @Override
+            public TypeConverter visit(VariantType variantType) {
+                return createConverter(
+                        variantType.isNullable(),
+                        (row, column, cv) -> {
+                            WritableBytesVector valueVector =
+                                    (WritableBytesVector) cv.getChildren()[0];
+                            WritableBytesVector metaDataVector =
+                                    (WritableBytesVector) cv.getChildren()[1];
+
+                            Variant variant = row.getVariant(column);
+                            byte[] value = variant.value();
+                            byte[] metadata = variant.metadata();
+                            valueVector.appendByteArray(value, 0, 
value.length);
+                            metaDataVector.appendByteArray(metadata, 0, 
metadata.length);
+                        });
+            }
+
+            @Override
+            public TypeConverter visit(BlobType blobType) {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public TypeConverter visit(ArrayType arrayType) {
+                return createConverter(
+                        arrayType.isNullable(),
+                        (row, column, cv) -> {
+                            HeapArrayVector arrayVector = (HeapArrayVector) cv;
+                            InternalArray values = row.getArray(column);
+                            int numElements = values.size();
+                            arrayVector.appendArray(numElements);
+
+                            WritableColumnVector arrData =
+                                    (WritableColumnVector) 
arrayVector.getColumnVector();
+                            TypeConverter elementConverter =
+                                    
getConverterForType(arrayType.getElementType());
+                            for (int i = 0; i < numElements; i++) {
+                                elementConverter.append(values, i, arrData);
+                            }
+                        });
+            }
+
+            @Override
+            public TypeConverter visit(MultisetType multisetType) {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public TypeConverter visit(MapType mapType) {
+                return createConverter(
+                        mapType.isNullable(),
+                        (row, column, cv) -> {
+                            HeapMapVector mapVector = (HeapMapVector) cv;
+                            InternalMap m = row.getMap(column);
+                            WritableColumnVector keys = (WritableColumnVector) 
mapVector.getKeys();
+                            WritableColumnVector values =
+                                    (WritableColumnVector) 
mapVector.getValues();
+                            int numElements = m.size();
+                            mapVector.appendArray(numElements);
+
+                            InternalArray srcKeys = m.keyArray();
+                            InternalArray srcValues = m.valueArray();
+                            TypeConverter keyConverter = 
getConverterForType(mapType.getKeyType());
+                            TypeConverter valueConverter =
+                                    
getConverterForType(mapType.getValueType());
+                            for (int i = 0; i < numElements; i++) {
+                                keyConverter.append(srcKeys, i, keys);
+                                valueConverter.append(srcValues, i, values);
+                            }
+                        });
+            }
+
+            @Override
+            public TypeConverter visit(RowType rowType) {
+                return createConverter(
+                        rowType.isNullable(),
+                        (row, column, cv) -> {
+                            HeapRowVector rowVector = (HeapRowVector) cv;
+                            rowVector.appendRow();
+                            InternalRow data = row.getRow(column, 
rowType.getFieldCount());
+                            ColumnVector[] children = cv.getChildren();
+                            for (int i = 0; i < rowType.getFieldCount(); i++) {
+                                TypeConverter fieldConverter =
+                                        
getConverterForType(rowType.getTypeAt(i));
+                                fieldConverter.append(data, i, 
(WritableColumnVector) children[i]);
+                            }
+                        });
+            }
+
+            private static TypeConverter createConverter(boolean nullable, 
ValueWriter writer) {
+                if (nullable) {
+                    return (row, column, cv) -> {
+                        if (row.isNullAt(column)) {
+                            cv.appendNull();
+                        } else {
+                            writer.write(row, column, cv);
+                        }
+                    };
+                } else {
+                    return writer::write;
+                }
+            }
+
+            private static TypeConverter binaryConverter(boolean nullable) {
+                return createConverter(
+                        nullable,
+                        (row, column, cv) -> {
+                            byte[] bytes = row.getBinary(column);
+                            ((WritableBytesVector) cv).appendByteArray(bytes, 
0, bytes.length);
+                        });
+            }
+
+            private static TypeConverter timestampConverter(boolean nullable, 
int precision) {
+                return createConverter(
+                        nullable,
+                        (row, column, cv) -> {
+                            Timestamp timestamp = row.getTimestamp(column, 
precision);
+                            if (cv instanceof WritableTimestampVector) {
+                                ((WritableTimestampVector) 
cv).appendTimestamp(timestamp);
+                            } else if (cv instanceof WritableLongVector && 
precision <= 3) {
+                                ((WritableLongVector) 
cv).appendLong(timestamp.getMillisecond());
+                            } else if (cv instanceof WritableLongVector && 
precision <= 6) {
+                                ((WritableLongVector) 
cv).appendLong(timestamp.toMicros());
+                            } else {
+                                throw new UnsupportedOperationException(
+                                        "Unsupported column vector: " + cv);
+                            }
+                        });
+            }
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractArrayBasedVector.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractArrayBasedVector.java
index 14aed29b0d..d9879e051d 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractArrayBasedVector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractArrayBasedVector.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.data.columnar.heap;
 
 import org.apache.paimon.data.columnar.ColumnVector;
+import org.apache.paimon.data.columnar.writable.WritableColumnVector;
 
 import java.util.Arrays;
 
@@ -63,6 +64,21 @@ public class AbstractArrayBasedVector extends 
AbstractStructVector {
         }
     }
 
+    public void appendArray(int length) {
+        reserve(elementsAppended + 1);
+        for (ColumnVector child : children) {
+            if (child instanceof WritableColumnVector) {
+                WritableColumnVector writableColumnVector = 
(WritableColumnVector) child;
+                
writableColumnVector.reserve(writableColumnVector.getElementsAppended() + 
length);
+            }
+        }
+        if (children[0] instanceof WritableColumnVector) {
+            WritableColumnVector arrayData = (WritableColumnVector) 
children[0];
+            putOffsetLength(elementsAppended, arrayData.getElementsAppended(), 
length);
+        }
+        elementsAppended++;
+    }
+
     @Override
     public void reset() {
         super.reset();
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBooleanVector.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBooleanVector.java
index 32e298ac42..8236d2ffe8 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBooleanVector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBooleanVector.java
@@ -81,6 +81,13 @@ public class HeapBooleanVector extends AbstractHeapVector 
implements WritableBoo
         setBooleans(rowId, 8, src, 0);
     }
 
+    @Override
+    public void appendBoolean(boolean v) {
+        reserve(elementsAppended + 1);
+        setBoolean(elementsAppended, v);
+        elementsAppended++;
+    }
+
     @Override
     public void fill(boolean value) {
         Arrays.fill(vector, value);
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapByteVector.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapByteVector.java
index 08539979fd..0f7c6224c5 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapByteVector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapByteVector.java
@@ -58,6 +58,13 @@ public class HeapByteVector extends AbstractHeapVector 
implements WritableByteVe
         Arrays.fill(vector, value);
     }
 
+    @Override
+    public void appendByte(byte v) {
+        reserve(elementsAppended + 1);
+        setByte(elementsAppended, v);
+        elementsAppended++;
+    }
+
     @Override
     void reserveForHeapVector(int newCapacity) {
         if (vector.length < newCapacity) {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java
index 3320ee7f4f..be0c05eee6 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java
@@ -87,6 +87,13 @@ public class HeapBytesVector extends AbstractHeapVector 
implements WritableBytes
         bytesAppended += length;
     }
 
+    @Override
+    public void appendByteArray(byte[] value, int offset, int length) {
+        reserve(elementsAppended + 1);
+        putByteArray(elementsAppended, value, offset, length);
+        elementsAppended++;
+    }
+
     @Override
     public void fill(byte[] value) {
         reserveBytes(start.length * value.length);
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapDoubleVector.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapDoubleVector.java
index b49d161ca4..d401a5c14a 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapDoubleVector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapDoubleVector.java
@@ -90,6 +90,13 @@ public class HeapDoubleVector extends AbstractHeapVector 
implements WritableDoub
         }
     }
 
+    @Override
+    public void appendDouble(double v) {
+        reserve(elementsAppended + 1);
+        setDouble(elementsAppended, v);
+        elementsAppended++;
+    }
+
     @Override
     public void fill(double value) {
         Arrays.fill(vector, value);
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapFloatVector.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapFloatVector.java
index d12d9b8b1e..24aff19c33 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapFloatVector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapFloatVector.java
@@ -89,6 +89,13 @@ public class HeapFloatVector extends AbstractHeapVector 
implements WritableFloat
         }
     }
 
+    @Override
+    public void appendFloat(float v) {
+        reserve(elementsAppended + 1);
+        setFloat(elementsAppended, v);
+        elementsAppended++;
+    }
+
     @Override
     public void fill(float value) {
         Arrays.fill(vector, value);
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapLongVector.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapLongVector.java
index 1e2eed3503..d6a9170728 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapLongVector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapLongVector.java
@@ -84,6 +84,13 @@ public class HeapLongVector extends AbstractHeapVector 
implements WritableLongVe
         }
     }
 
+    @Override
+    public void appendLong(long v) {
+        reserve(elementsAppended + 1);
+        setLong(elementsAppended, v);
+        elementsAppended++;
+    }
+
     @Override
     public void fill(long value) {
         Arrays.fill(vector, value);
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapMapVector.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapMapVector.java
index b1d94575cb..1623e1ecd4 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapMapVector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapMapVector.java
@@ -34,10 +34,18 @@ public class HeapMapVector extends AbstractArrayBasedVector 
implements MapColumn
         children[0] = keys;
     }
 
+    public ColumnVector getKeys() {
+        return children[0];
+    }
+
     public void setValues(ColumnVector values) {
         children[1] = values;
     }
 
+    public ColumnVector getValues() {
+        return children[1];
+    }
+
     @Override
     public InternalMap getMap(int i) {
         long offset = offsets[i];
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapRowVector.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapRowVector.java
index 6f1bec9ef9..a031a5d260 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapRowVector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapRowVector.java
@@ -52,6 +52,11 @@ public class HeapRowVector extends AbstractStructVector
         // Nothing to store.
     }
 
+    public void appendRow() {
+        reserve(elementsAppended + 1);
+        elementsAppended++;
+    }
+
     public void setFields(WritableColumnVector[] fields) {
         System.arraycopy(fields, 0, this.children, 0, fields.length);
         this.vectorizedColumnBatch = new VectorizedColumnBatch(children);
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapShortVector.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapShortVector.java
index 86ff1bd01c..8f7335d1c7 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapShortVector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapShortVector.java
@@ -60,6 +60,13 @@ public class HeapShortVector extends AbstractHeapVector 
implements WritableShort
         vector[i] = value;
     }
 
+    @Override
+    public void appendShort(short v) {
+        reserve(elementsAppended + 1);
+        setShort(elementsAppended, v);
+        elementsAppended++;
+    }
+
     @Override
     public void fill(short value) {
         Arrays.fill(vector, value);
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapTimestampVector.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapTimestampVector.java
index aa7aec9b1e..d79b5e7547 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapTimestampVector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapTimestampVector.java
@@ -60,6 +60,13 @@ public class HeapTimestampVector extends AbstractHeapVector 
implements WritableT
         nanoOfMilliseconds[i] = timestamp.getNanoOfMillisecond();
     }
 
+    @Override
+    public void appendTimestamp(Timestamp timestamp) {
+        reserve(elementsAppended + 1);
+        setTimestamp(elementsAppended, timestamp);
+        elementsAppended++;
+    }
+
     @Override
     public void fill(Timestamp value) {
         Arrays.fill(milliseconds, value.getMillisecond());
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableBooleanVector.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableBooleanVector.java
index 558da35b01..d1c8c86ab5 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableBooleanVector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableBooleanVector.java
@@ -32,6 +32,8 @@ public interface WritableBooleanVector extends 
WritableColumnVector, BooleanColu
 
     void setBooleans(int rowId, byte src);
 
+    void appendBoolean(boolean v);
+
     /** Fill the column vector with the provided value. */
     void fill(boolean value);
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableByteVector.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableByteVector.java
index 802ea29888..04c2e02477 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableByteVector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableByteVector.java
@@ -28,4 +28,6 @@ public interface WritableByteVector extends 
WritableColumnVector, ByteColumnVect
 
     /** Fill the column vector with the provided value. */
     void fill(byte value);
+
+    void appendByte(byte v);
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableBytesVector.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableBytesVector.java
index 6cba184835..49d1bdf537 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableBytesVector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableBytesVector.java
@@ -29,6 +29,8 @@ public interface WritableBytesVector extends 
WritableColumnVector, BytesColumnVe
      */
     void putByteArray(int rowId, byte[] value, int offset, int length);
 
+    void appendByteArray(byte[] value, int offset, int length);
+
     /** Fill the column vector with the provided value. */
     void fill(byte[] value);
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableColumnVector.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableColumnVector.java
index 5dff02ae11..c03a171537 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableColumnVector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableColumnVector.java
@@ -33,6 +33,13 @@ public interface WritableColumnVector extends ColumnVector {
     /** Set nulls from rowId to rowId + count (exclude). */
     void setNulls(int rowId, int count);
 
+    default void appendNull() {
+        int elementsAppended = getElementsAppended();
+        reserve(elementsAppended + 1);
+        setNullAt(elementsAppended);
+        addElementsAppended(1);
+    }
+
     /** Fill the column vector with nulls. */
     void fillWithNulls();
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableDoubleVector.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableDoubleVector.java
index 5b493b6568..049fc30fb6 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableDoubleVector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableDoubleVector.java
@@ -36,6 +36,8 @@ public interface WritableDoubleVector extends 
WritableColumnVector, DoubleColumn
      */
     void setDoublesFromBinary(int rowId, int count, byte[] src, int srcIndex);
 
+    void appendDouble(double v);
+
     /** Fill the column vector with the provided value. */
     void fill(double value);
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableFloatVector.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableFloatVector.java
index 62e1f4a75e..b94f979dc6 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableFloatVector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableFloatVector.java
@@ -36,6 +36,8 @@ public interface WritableFloatVector extends 
WritableColumnVector, FloatColumnVe
      */
     void setFloatsFromBinary(int rowId, int count, byte[] src, int srcIndex);
 
+    void appendFloat(float v);
+
     /** Fill the column vector with the provided value. */
     void fill(float value);
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableLongVector.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableLongVector.java
index da9b4aaeb2..158753145b 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableLongVector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableLongVector.java
@@ -36,6 +36,8 @@ public interface WritableLongVector extends 
WritableColumnVector, LongColumnVect
      */
     void setLongsFromBinary(int rowId, int count, byte[] src, int srcIndex);
 
+    void appendLong(long v);
+
     /** Fill the column vector with the provided value. */
     void fill(long value);
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableShortVector.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableShortVector.java
index 94b70305c7..59077e0386 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableShortVector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableShortVector.java
@@ -26,6 +26,8 @@ public interface WritableShortVector extends 
WritableColumnVector, ShortColumnVe
     /** Set short at rowId with the provided value. */
     void setShort(int rowId, short value);
 
+    void appendShort(short v);
+
     /** Fill the column vector with the provided value. */
     void fill(short value);
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableTimestampVector.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableTimestampVector.java
index e7d308b62d..586c5f092c 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableTimestampVector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableTimestampVector.java
@@ -27,6 +27,8 @@ public interface WritableTimestampVector extends 
WritableColumnVector, Timestamp
     /** Set {@link Timestamp} at rowId with the provided value. */
     void setTimestamp(int rowId, Timestamp timestamp);
 
+    void appendTimestamp(Timestamp timestamp);
+
     /** Fill the column vector with the provided value. */
     void fill(Timestamp value);
 }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/data/columnar/RowToColumnConverterTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/RowToColumnConverterTest.java
new file mode 100644
index 0000000000..3d28183385
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/RowToColumnConverterTest.java
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.columnar;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.columnar.heap.HeapArrayVector;
+import org.apache.paimon.data.columnar.heap.HeapBooleanVector;
+import org.apache.paimon.data.columnar.heap.HeapByteVector;
+import org.apache.paimon.data.columnar.heap.HeapBytesVector;
+import org.apache.paimon.data.columnar.heap.HeapDoubleVector;
+import org.apache.paimon.data.columnar.heap.HeapFloatVector;
+import org.apache.paimon.data.columnar.heap.HeapIntVector;
+import org.apache.paimon.data.columnar.heap.HeapLongVector;
+import org.apache.paimon.data.columnar.heap.HeapMapVector;
+import org.apache.paimon.data.columnar.heap.HeapRowVector;
+import org.apache.paimon.data.columnar.heap.HeapShortVector;
+import org.apache.paimon.data.columnar.writable.WritableColumnVector;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link RowToColumnConverter}. */
+public class RowToColumnConverterTest {
+
+    @Test
+    public void testConvertIntType() {
+        RowType rowType = RowType.of(new DataField(0, "f", new IntType()));
+        RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+        GenericRow row1 = GenericRow.of(123);
+        GenericRow row2 = GenericRow.of(456);
+
+        HeapIntVector intVector = new HeapIntVector(2);
+        WritableColumnVector[] vectors = new WritableColumnVector[] 
{intVector};
+
+        converter.convert(row1, vectors);
+        converter.convert(row2, vectors);
+
+        assertThat(intVector.getInt(0)).isEqualTo(123);
+        assertThat(intVector.getInt(1)).isEqualTo(456);
+    }
+
+    @Test
+    public void testConvertNullableIntType() {
+        RowType rowType = RowType.of(new DataField(0, "f", new IntType()));
+        RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+        GenericRow row1 = GenericRow.of((Object) null);
+        GenericRow row2 = GenericRow.of(789);
+
+        HeapIntVector intVector = new HeapIntVector(2);
+        WritableColumnVector[] vectors = new WritableColumnVector[] 
{intVector};
+
+        converter.convert(row1, vectors);
+        converter.convert(row2, vectors);
+
+        assertThat(intVector.isNullAt(0)).isTrue();
+        assertThat(intVector.getInt(1)).isEqualTo(789);
+    }
+
+    @Test
+    public void testConvertBooleanType() {
+        RowType rowType = RowType.of(new DataField(0, "f", new BooleanType()));
+        RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+        GenericRow row1 = GenericRow.of(true);
+        GenericRow row2 = GenericRow.of(false);
+
+        HeapBooleanVector booleanVector = new HeapBooleanVector(2);
+        WritableColumnVector[] vectors = new WritableColumnVector[] 
{booleanVector};
+
+        converter.convert(row1, vectors);
+        converter.convert(row2, vectors);
+
+        assertThat(booleanVector.getBoolean(0)).isTrue();
+        assertThat(booleanVector.getBoolean(1)).isFalse();
+    }
+
+    @Test
+    public void testConvertByteType() {
+        RowType rowType = RowType.of(new DataField(0, "f", new TinyIntType()));
+        RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+        GenericRow row1 = GenericRow.of((byte) 123);
+        GenericRow row2 = GenericRow.of((byte) 45);
+
+        HeapByteVector byteVector = new HeapByteVector(2);
+        WritableColumnVector[] vectors = new WritableColumnVector[] 
{byteVector};
+
+        converter.convert(row1, vectors);
+        converter.convert(row2, vectors);
+
+        assertThat(byteVector.getByte(0)).isEqualTo((byte) 123);
+        assertThat(byteVector.getByte(1)).isEqualTo((byte) 45);
+    }
+
+    @Test
+    public void testConvertShortType() {
+        RowType rowType = RowType.of(new DataField(0, "f", new 
SmallIntType()));
+        RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+        GenericRow row1 = GenericRow.of((short) 1234);
+        GenericRow row2 = GenericRow.of((short) 5678);
+
+        HeapShortVector shortVector = new HeapShortVector(2);
+        WritableColumnVector[] vectors = new WritableColumnVector[] 
{shortVector};
+
+        converter.convert(row1, vectors);
+        converter.convert(row2, vectors);
+
+        assertThat(shortVector.getShort(0)).isEqualTo((short) 1234);
+        assertThat(shortVector.getShort(1)).isEqualTo((short) 5678);
+    }
+
+    @Test
+    public void testConvertLongType() {
+        RowType rowType = RowType.of(new DataField(0, "f", new BigIntType()));
+        RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+        GenericRow row1 = GenericRow.of(123456789L);
+        GenericRow row2 = GenericRow.of(987654321L);
+
+        HeapLongVector longVector = new HeapLongVector(2);
+        WritableColumnVector[] vectors = new WritableColumnVector[] 
{longVector};
+
+        converter.convert(row1, vectors);
+        converter.convert(row2, vectors);
+
+        assertThat(longVector.getLong(0)).isEqualTo(123456789L);
+        assertThat(longVector.getLong(1)).isEqualTo(987654321L);
+    }
+
+    @Test
+    public void testConvertFloatType() {
+        RowType rowType = RowType.of(new DataField(0, "f", new FloatType()));
+        RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+        GenericRow row1 = GenericRow.of(123.45f);
+        GenericRow row2 = GenericRow.of(678.90f);
+
+        HeapFloatVector floatVector = new HeapFloatVector(2);
+        WritableColumnVector[] vectors = new WritableColumnVector[] 
{floatVector};
+
+        converter.convert(row1, vectors);
+        converter.convert(row2, vectors);
+
+        assertThat(floatVector.getFloat(0)).isEqualTo(123.45f);
+        assertThat(floatVector.getFloat(1)).isEqualTo(678.90f);
+    }
+
+    @Test
+    public void testConvertDoubleType() {
+        RowType rowType = RowType.of(new DataField(0, "f", new DoubleType()));
+        RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+        GenericRow row1 = GenericRow.of(123.456789D);
+        GenericRow row2 = GenericRow.of(987.654321D);
+
+        HeapDoubleVector doubleVector = new HeapDoubleVector(2);
+        WritableColumnVector[] vectors = new WritableColumnVector[] 
{doubleVector};
+
+        converter.convert(row1, vectors);
+        converter.convert(row2, vectors);
+
+        assertThat(doubleVector.getDouble(0)).isEqualTo(123.456789D);
+        assertThat(doubleVector.getDouble(1)).isEqualTo(987.654321D);
+    }
+
+    @Test
+    public void testConvertStringType() {
+        RowType rowType = RowType.of(new DataField(0, "f", new 
VarCharType(10)));
+        RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+        GenericRow row1 = GenericRow.of(BinaryString.fromString("hello"));
+        GenericRow row2 = GenericRow.of(BinaryString.fromString("world"));
+
+        HeapBytesVector bytesVector = new HeapBytesVector(2);
+        WritableColumnVector[] vectors = new WritableColumnVector[] 
{bytesVector};
+
+        converter.convert(row1, vectors);
+        converter.convert(row2, vectors);
+
+        assertThat(new 
String(bytesVector.getBytes(0).getBytes())).isEqualTo("hello");
+        assertThat(new 
String(bytesVector.getBytes(1).getBytes())).isEqualTo("world");
+    }
+
+    @Test
+    public void testConvertBinaryType() {
+        RowType rowType = RowType.of(new DataField(0, "f", new BinaryType(5)));
+        RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+        GenericRow row1 = GenericRow.of((Object) "hello".getBytes());
+        GenericRow row2 = GenericRow.of((Object) "world".getBytes());
+
+        HeapBytesVector bytesVector = new HeapBytesVector(2);
+        WritableColumnVector[] vectors = new WritableColumnVector[] 
{bytesVector};
+
+        converter.convert(row1, vectors);
+        converter.convert(row2, vectors);
+
+        assertThat(new 
String(bytesVector.getBytes(0).getBytes())).isEqualTo("hello");
+        assertThat(new 
String(bytesVector.getBytes(1).getBytes())).isEqualTo("world");
+    }
+
+    @Test
+    public void testConvertVarBinaryType() {
+        RowType rowType = RowType.of(new DataField(0, "f", new 
VarBinaryType(10)));
+        RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+        GenericRow row1 = GenericRow.of((Object) "hello".getBytes());
+        GenericRow row2 = GenericRow.of((Object) "world".getBytes());
+
+        HeapBytesVector bytesVector = new HeapBytesVector(2);
+        WritableColumnVector[] vectors = new WritableColumnVector[] 
{bytesVector};
+
+        converter.convert(row1, vectors);
+        converter.convert(row2, vectors);
+
+        assertThat(new 
String(bytesVector.getBytes(0).getBytes())).isEqualTo("hello");
+        assertThat(new 
String(bytesVector.getBytes(1).getBytes())).isEqualTo("world");
+    }
+
+    @Test
+    public void testConvertDecimalType() {
+        RowType rowType = RowType.of(new DataField(0, "f", new DecimalType(10, 
2)));
+        RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+        Decimal d1 = Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2);
+        Decimal d2 = Decimal.fromBigDecimal(new BigDecimal("678.90"), 10, 2);
+        GenericRow row1 = GenericRow.of(d1);
+        GenericRow row2 = GenericRow.of(d2);
+
+        HeapLongVector longVector = new HeapLongVector(2);
+        WritableColumnVector[] vectors = new WritableColumnVector[] 
{longVector};
+
+        converter.convert(row1, vectors);
+        converter.convert(row2, vectors);
+
+        assertThat(longVector.getLong(0)).isEqualTo(d1.toUnscaledLong());
+        assertThat(longVector.getLong(1)).isEqualTo(d2.toUnscaledLong());
+    }
+
+    @Test
+    public void testConvertTimestampType() {
+        RowType rowType = RowType.of(new DataField(0, "f", new 
TimestampType(3)));
+        RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+        Timestamp timestamp1 = Timestamp.fromEpochMillis(1234567890L);
+        Timestamp timestamp2 = Timestamp.fromEpochMillis(9876543210L);
+
+        GenericRow row1 = GenericRow.of(timestamp1);
+        GenericRow row2 = GenericRow.of(timestamp2);
+
+        HeapLongVector longVector = new HeapLongVector(2);
+        WritableColumnVector[] vectors = new WritableColumnVector[] 
{longVector};
+
+        converter.convert(row1, vectors);
+        converter.convert(row2, vectors);
+
+        assertThat(longVector.getLong(0)).isEqualTo(1234567890L);
+        assertThat(longVector.getLong(1)).isEqualTo(9876543210L);
+    }
+
+    @Test
+    public void testConvertDateType() {
+        RowType rowType = RowType.of(new DataField(0, "f", new DateType()));
+        RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+        GenericRow row1 = GenericRow.of(12345);
+        GenericRow row2 = GenericRow.of(67890);
+
+        HeapIntVector intVector = new HeapIntVector(2);
+        WritableColumnVector[] vectors = new WritableColumnVector[] 
{intVector};
+
+        converter.convert(row1, vectors);
+        converter.convert(row2, vectors);
+
+        assertThat(intVector.getInt(0)).isEqualTo(12345);
+        assertThat(intVector.getInt(1)).isEqualTo(67890);
+    }
+
+    @Test
+    public void testConvertTimeType() {
+        RowType rowType = RowType.of(new DataField(0, "f", new TimeType()));
+        RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+        GenericRow row1 = GenericRow.of(12345);
+        GenericRow row2 = GenericRow.of(67890);
+
+        HeapIntVector intVector = new HeapIntVector(2);
+        WritableColumnVector[] vectors = new WritableColumnVector[] 
{intVector};
+
+        converter.convert(row1, vectors);
+        converter.convert(row2, vectors);
+
+        assertThat(intVector.getInt(0)).isEqualTo(12345);
+        assertThat(intVector.getInt(1)).isEqualTo(67890);
+    }
+
+    @Test
+    public void testConvertArrayType() {
+        RowType rowType = RowType.of(new DataField(0, "f", new ArrayType(new 
IntType())));
+        RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+        GenericRow row1 = GenericRow.of(new GenericArray(new Object[] {1, 2, 
3}));
+        GenericRow row2 = GenericRow.of(new GenericArray(new Object[] {4, 5, 
6}));
+
+        HeapIntVector elementVector = new HeapIntVector(6);
+        HeapArrayVector arrayVector = new HeapArrayVector(2, elementVector);
+        arrayVector.putOffsetLength(0, 0, 3);
+        arrayVector.putOffsetLength(1, 3, 3);
+
+        WritableColumnVector[] vectors = new WritableColumnVector[] 
{arrayVector};
+
+        converter.convert(row1, vectors);
+        converter.convert(row2, vectors);
+
+        assertThat(elementVector.getInt(0)).isEqualTo(1);
+        assertThat(elementVector.getInt(1)).isEqualTo(2);
+        assertThat(elementVector.getInt(2)).isEqualTo(3);
+        assertThat(elementVector.getInt(3)).isEqualTo(4);
+        assertThat(elementVector.getInt(4)).isEqualTo(5);
+        assertThat(elementVector.getInt(5)).isEqualTo(6);
+
+        // test reserve
+        GenericRow row3 = GenericRow.of(new GenericArray(new Object[] {7, 8}));
+        converter.convert(row3, vectors);
+        assertThat(elementVector.getInt(6)).isEqualTo(7);
+        assertThat(elementVector.getInt(7)).isEqualTo(8);
+    }
+
+    @Test
+    public void testConvertMapType() {
+        RowType rowType =
+                RowType.of(
+                        new DataField(0, "f", 
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT())));
+        RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+        Map<BinaryString, Integer> map1 = new LinkedHashMap<>();
+        map1.put(BinaryString.fromString("a"), 1);
+        map1.put(BinaryString.fromString("b"), 2);
+        GenericRow row1 = GenericRow.of(new GenericMap(map1));
+
+        Map<BinaryString, Integer> map2 = new LinkedHashMap<>();
+        map2.put(BinaryString.fromString("c"), 3);
+        map2.put(BinaryString.fromString("d"), 4);
+        GenericRow row2 = GenericRow.of(new GenericMap(map2));
+
+        HeapBytesVector keyVector = new HeapBytesVector(4);
+        HeapIntVector valueVector = new HeapIntVector(4);
+        HeapMapVector mapVector = new HeapMapVector(2, keyVector, valueVector);
+        mapVector.putOffsetLength(0, 0, 2);
+        mapVector.putOffsetLength(1, 2, 2);
+
+        WritableColumnVector[] vectors = new WritableColumnVector[] 
{mapVector};
+
+        converter.convert(row1, vectors);
+        converter.convert(row2, vectors);
+
+        assertThat(new 
String(keyVector.getBytes(0).getBytes())).isEqualTo("a");
+        assertThat(valueVector.getInt(0)).isEqualTo(1);
+        assertThat(new 
String(keyVector.getBytes(1).getBytes())).isEqualTo("b");
+        assertThat(valueVector.getInt(1)).isEqualTo(2);
+        assertThat(new 
String(keyVector.getBytes(2).getBytes())).isEqualTo("c");
+        assertThat(valueVector.getInt(2)).isEqualTo(3);
+        assertThat(new 
String(keyVector.getBytes(3).getBytes())).isEqualTo("d");
+        assertThat(valueVector.getInt(3)).isEqualTo(4);
+
+        // test reserve
+        Map<BinaryString, Integer> map3 = new LinkedHashMap<>();
+        map3.put(BinaryString.fromString("e"), 5);
+        GenericRow row3 = GenericRow.of(new GenericMap(map3));
+        converter.convert(row3, vectors);
+        assertThat(new 
String(keyVector.getBytes(4).getBytes())).isEqualTo("e");
+        assertThat(valueVector.getInt(4)).isEqualTo(5);
+    }
+
+    @Test
+    public void testConvertRowType() {
+        RowType rowType =
+                RowType.of(
+                        new DataField(
+                                0,
+                                "f",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD(0, "id", 
DataTypes.INT()),
+                                        DataTypes.FIELD(1, "name", 
DataTypes.STRING()))));
+        RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+        GenericRow rowValue1 = GenericRow.of(1, 
BinaryString.fromString("Alice"));
+        GenericRow row1 = GenericRow.of(rowValue1);
+
+        GenericRow rowValue2 = GenericRow.of(2, 
BinaryString.fromString("Bob"));
+        GenericRow row2 = GenericRow.of(rowValue2);
+
+        HeapIntVector idVector = new HeapIntVector(2);
+        HeapBytesVector nameVector = new HeapBytesVector(2);
+        HeapRowVector rowVector = new HeapRowVector(2, idVector, nameVector);
+
+        WritableColumnVector[] vectors = new WritableColumnVector[] 
{rowVector};
+
+        converter.convert(row1, vectors);
+        converter.convert(row2, vectors);
+
+        assertThat(idVector.getInt(0)).isEqualTo(1);
+        assertThat(new 
String(nameVector.getBytes(0).getBytes())).isEqualTo("Alice");
+        assertThat(idVector.getInt(1)).isEqualTo(2);
+        assertThat(new 
String(nameVector.getBytes(1).getBytes())).isEqualTo("Bob");
+
+        // test reserve
+        GenericRow row3 = GenericRow.of(GenericRow.of(3, 
BinaryString.fromString("Charlie")));
+        converter.convert(row3, vectors);
+        assertThat(idVector.getInt(2)).isEqualTo(3);
+        assertThat(new 
String(nameVector.getBytes(2).getBytes())).isEqualTo("Charlie");
+        assertThat(rowVector.getRow(2).getInt(0)).isEqualTo(3);
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java
index 543005d7d2..5ac0c2e5d2 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java
@@ -163,6 +163,13 @@ public class ParquetDecimalVector
         }
     }
 
+    @Override
+    public void appendByteArray(byte[] value, int offset, int length) {
+        if (vector instanceof WritableBytesVector) {
+            ((WritableBytesVector) vector).appendByteArray(value, offset, 
length);
+        }
+    }
+
     @Override
     public void fill(byte[] value) {
         if (vector instanceof WritableBytesVector) {
@@ -242,6 +249,13 @@ public class ParquetDecimalVector
         }
     }
 
+    @Override
+    public void appendLong(long v) {
+        if (vector instanceof WritableLongVector) {
+            ((WritableLongVector) vector).appendLong(v);
+        }
+    }
+
     @Override
     public void setLongsFromBinary(int rowId, int count, byte[] src, int 
srcIndex) {
         if (vector instanceof WritableLongVector) {

Reply via email to