This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 080a3ef89f [core] Introduce RowToColumnConverter (#6673)
080a3ef89f is described below
commit 080a3ef89f9fd9dc684cd8c357b7fe9c6cde107f
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Nov 26 10:17:35 2025 +0800
[core] Introduce RowToColumnConverter (#6673)
---
.../paimon/data/columnar/RowToColumnConverter.java | 373 +++++++++++++++++
.../columnar/heap/AbstractArrayBasedVector.java | 16 +
.../data/columnar/heap/HeapBooleanVector.java | 7 +
.../paimon/data/columnar/heap/HeapByteVector.java | 7 +
.../paimon/data/columnar/heap/HeapBytesVector.java | 7 +
.../data/columnar/heap/HeapDoubleVector.java | 7 +
.../paimon/data/columnar/heap/HeapFloatVector.java | 7 +
.../paimon/data/columnar/heap/HeapLongVector.java | 7 +
.../paimon/data/columnar/heap/HeapMapVector.java | 8 +
.../paimon/data/columnar/heap/HeapRowVector.java | 5 +
.../paimon/data/columnar/heap/HeapShortVector.java | 7 +
.../data/columnar/heap/HeapTimestampVector.java | 7 +
.../columnar/writable/WritableBooleanVector.java | 2 +
.../data/columnar/writable/WritableByteVector.java | 2 +
.../columnar/writable/WritableBytesVector.java | 2 +
.../columnar/writable/WritableColumnVector.java | 7 +
.../columnar/writable/WritableDoubleVector.java | 2 +
.../columnar/writable/WritableFloatVector.java | 2 +
.../data/columnar/writable/WritableLongVector.java | 2 +
.../columnar/writable/WritableShortVector.java | 2 +
.../columnar/writable/WritableTimestampVector.java | 2 +
.../data/columnar/RowToColumnConverterTest.java | 461 +++++++++++++++++++++
.../parquet/reader/ParquetDecimalVector.java | 14 +
23 files changed, 956 insertions(+)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/RowToColumnConverter.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/RowToColumnConverter.java
new file mode 100644
index 0000000000..d2a378846c
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/RowToColumnConverter.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.columnar;
+
+import org.apache.paimon.data.DataGetters;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.columnar.heap.HeapArrayVector;
+import org.apache.paimon.data.columnar.heap.HeapMapVector;
+import org.apache.paimon.data.columnar.heap.HeapRowVector;
+import org.apache.paimon.data.columnar.writable.WritableBooleanVector;
+import org.apache.paimon.data.columnar.writable.WritableByteVector;
+import org.apache.paimon.data.columnar.writable.WritableBytesVector;
+import org.apache.paimon.data.columnar.writable.WritableColumnVector;
+import org.apache.paimon.data.columnar.writable.WritableDoubleVector;
+import org.apache.paimon.data.columnar.writable.WritableFloatVector;
+import org.apache.paimon.data.columnar.writable.WritableIntVector;
+import org.apache.paimon.data.columnar.writable.WritableLongVector;
+import org.apache.paimon.data.columnar.writable.WritableShortVector;
+import org.apache.paimon.data.columnar.writable.WritableTimestampVector;
+import org.apache.paimon.data.variant.Variant;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BlobType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeVisitor;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.types.VariantType;
+
+import java.io.Serializable;
+import java.util.List;
+
+/** Covert row based data to columnar data. */
+public class RowToColumnConverter {
+
+ private final TypeConverter[] converters;
+
+ public RowToColumnConverter(RowType rowType) {
+ List<DataType> fieldTypes = rowType.getFieldTypes();
+ this.converters = new TypeConverter[fieldTypes.size()];
+ for (int i = 0; i < fieldTypes.size(); i++) {
+ converters[i] =
TypeConverter.getConverterForType(fieldTypes.get(i));
+ }
+ }
+
+ public void convert(InternalRow row, WritableColumnVector[] vectors) {
+ for (int i = 0; i < row.getFieldCount(); i++) {
+ converters[i].append(row, i, vectors[i]);
+ }
+ }
+
+ private interface TypeConverter extends Serializable {
+
+ void append(DataGetters row, int column, WritableColumnVector cv);
+
+ static TypeConverter getConverterForType(DataType dataType) {
+ return dataType.accept(TypeConverterVisitor.INSTANCE);
+ }
+
+ class TypeConverterVisitor implements DataTypeVisitor<TypeConverter> {
+
+ static final TypeConverterVisitor INSTANCE = new
TypeConverterVisitor();
+
+ @FunctionalInterface
+ interface ValueWriter {
+ void write(DataGetters row, int column, WritableColumnVector
cv);
+ }
+
+ @Override
+ public TypeConverter visit(CharType charType) {
+ return createConverter(
+ charType.isNullable(),
+ (row, column, cv) ->
+ ((WritableByteVector)
cv).appendByte(row.getByte(column)));
+ }
+
+ @Override
+ public TypeConverter visit(VarCharType varCharType) {
+ return createConverter(
+ varCharType.isNullable(),
+ (row, column, cv) -> {
+ byte[] bytes = row.getString(column).toBytes();
+ ((WritableBytesVector) cv).appendByteArray(bytes,
0, bytes.length);
+ });
+ }
+
+ @Override
+ public TypeConverter visit(BooleanType booleanType) {
+ return createConverter(
+ booleanType.isNullable(),
+ (row, column, cv) ->
+ ((WritableBooleanVector)
cv).appendBoolean(row.getBoolean(column)));
+ }
+
+ @Override
+ public TypeConverter visit(BinaryType binaryType) {
+ return binaryConverter(binaryType.isNullable());
+ }
+
+ @Override
+ public TypeConverter visit(VarBinaryType varBinaryType) {
+ return binaryConverter(varBinaryType.isNullable());
+ }
+
+ @Override
+ public TypeConverter visit(DecimalType decimalType) {
+ return createConverter(
+ decimalType.isNullable(),
+ (row, column, cv) -> {
+ Decimal decimal =
+ row.getDecimal(
+ column,
+ decimalType.getPrecision(),
+ decimalType.getScale());
+ if (cv instanceof WritableIntVector) {
+ ((WritableIntVector) cv).appendInt((int)
decimal.toUnscaledLong());
+ } else if (cv instanceof WritableLongVector) {
+ ((WritableLongVector)
cv).appendLong(decimal.toUnscaledLong());
+ } else if (cv instanceof WritableBytesVector) {
+ byte[] bytes = decimal.toUnscaledBytes();
+ ((WritableBytesVector)
cv).appendByteArray(bytes, 0, bytes.length);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported column vector: " + cv);
+ }
+ });
+ }
+
+ @Override
+ public TypeConverter visit(TinyIntType tinyIntType) {
+ return createConverter(
+ tinyIntType.isNullable(),
+ (row, column, cv) ->
+ ((WritableByteVector)
cv).appendByte(row.getByte(column)));
+ }
+
+ @Override
+ public TypeConverter visit(SmallIntType smallIntType) {
+ return createConverter(
+ smallIntType.isNullable(),
+ (row, column, cv) ->
+ ((WritableShortVector)
cv).appendShort(row.getShort(column)));
+ }
+
+ @Override
+ public TypeConverter visit(IntType intType) {
+ return createConverter(
+ intType.isNullable(),
+ (row, column, cv) ->
+ ((WritableIntVector)
cv).appendInt(row.getInt(column)));
+ }
+
+ @Override
+ public TypeConverter visit(BigIntType bigIntType) {
+ return createConverter(
+ bigIntType.isNullable(),
+ (row, column, cv) ->
+ ((WritableLongVector)
cv).appendLong(row.getLong(column)));
+ }
+
+ @Override
+ public TypeConverter visit(FloatType floatType) {
+ return createConverter(
+ floatType.isNullable(),
+ (row, column, cv) ->
+ ((WritableFloatVector)
cv).appendFloat(row.getFloat(column)));
+ }
+
+ @Override
+ public TypeConverter visit(DoubleType doubleType) {
+ return createConverter(
+ doubleType.isNullable(),
+ (row, column, cv) ->
+ ((WritableDoubleVector)
cv).appendDouble(row.getDouble(column)));
+ }
+
+ @Override
+ public TypeConverter visit(DateType dateType) {
+ return createConverter(
+ dateType.isNullable(),
+ (row, column, cv) ->
+ ((WritableIntVector)
cv).appendInt(row.getInt(column)));
+ }
+
+ @Override
+ public TypeConverter visit(TimeType timeType) {
+ return createConverter(
+ timeType.isNullable(),
+ (row, column, cv) ->
+ ((WritableIntVector)
cv).appendInt(row.getInt(column)));
+ }
+
+ @Override
+ public TypeConverter visit(TimestampType timestampType) {
+ return timestampConverter(timestampType.isNullable(),
timestampType.getPrecision());
+ }
+
+ @Override
+ public TypeConverter visit(LocalZonedTimestampType
localZonedTimestampType) {
+ return timestampConverter(
+ localZonedTimestampType.isNullable(),
+ localZonedTimestampType.getPrecision());
+ }
+
+ @Override
+ public TypeConverter visit(VariantType variantType) {
+ return createConverter(
+ variantType.isNullable(),
+ (row, column, cv) -> {
+ WritableBytesVector valueVector =
+ (WritableBytesVector) cv.getChildren()[0];
+ WritableBytesVector metaDataVector =
+ (WritableBytesVector) cv.getChildren()[1];
+
+ Variant variant = row.getVariant(column);
+ byte[] value = variant.value();
+ byte[] metadata = variant.metadata();
+ valueVector.appendByteArray(value, 0,
value.length);
+ metaDataVector.appendByteArray(metadata, 0,
metadata.length);
+ });
+ }
+
+ @Override
+ public TypeConverter visit(BlobType blobType) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TypeConverter visit(ArrayType arrayType) {
+ return createConverter(
+ arrayType.isNullable(),
+ (row, column, cv) -> {
+ HeapArrayVector arrayVector = (HeapArrayVector) cv;
+ InternalArray values = row.getArray(column);
+ int numElements = values.size();
+ arrayVector.appendArray(numElements);
+
+ WritableColumnVector arrData =
+ (WritableColumnVector)
arrayVector.getColumnVector();
+ TypeConverter elementConverter =
+
getConverterForType(arrayType.getElementType());
+ for (int i = 0; i < numElements; i++) {
+ elementConverter.append(values, i, arrData);
+ }
+ });
+ }
+
+ @Override
+ public TypeConverter visit(MultisetType multisetType) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TypeConverter visit(MapType mapType) {
+ return createConverter(
+ mapType.isNullable(),
+ (row, column, cv) -> {
+ HeapMapVector mapVector = (HeapMapVector) cv;
+ InternalMap m = row.getMap(column);
+ WritableColumnVector keys = (WritableColumnVector)
mapVector.getKeys();
+ WritableColumnVector values =
+ (WritableColumnVector)
mapVector.getValues();
+ int numElements = m.size();
+ mapVector.appendArray(numElements);
+
+ InternalArray srcKeys = m.keyArray();
+ InternalArray srcValues = m.valueArray();
+ TypeConverter keyConverter =
getConverterForType(mapType.getKeyType());
+ TypeConverter valueConverter =
+
getConverterForType(mapType.getValueType());
+ for (int i = 0; i < numElements; i++) {
+ keyConverter.append(srcKeys, i, keys);
+ valueConverter.append(srcValues, i, values);
+ }
+ });
+ }
+
+ @Override
+ public TypeConverter visit(RowType rowType) {
+ return createConverter(
+ rowType.isNullable(),
+ (row, column, cv) -> {
+ HeapRowVector rowVector = (HeapRowVector) cv;
+ rowVector.appendRow();
+ InternalRow data = row.getRow(column,
rowType.getFieldCount());
+ ColumnVector[] children = cv.getChildren();
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ TypeConverter fieldConverter =
+
getConverterForType(rowType.getTypeAt(i));
+ fieldConverter.append(data, i,
(WritableColumnVector) children[i]);
+ }
+ });
+ }
+
+ private static TypeConverter createConverter(boolean nullable,
ValueWriter writer) {
+ if (nullable) {
+ return (row, column, cv) -> {
+ if (row.isNullAt(column)) {
+ cv.appendNull();
+ } else {
+ writer.write(row, column, cv);
+ }
+ };
+ } else {
+ return writer::write;
+ }
+ }
+
+ private static TypeConverter binaryConverter(boolean nullable) {
+ return createConverter(
+ nullable,
+ (row, column, cv) -> {
+ byte[] bytes = row.getBinary(column);
+ ((WritableBytesVector) cv).appendByteArray(bytes,
0, bytes.length);
+ });
+ }
+
+ private static TypeConverter timestampConverter(boolean nullable,
int precision) {
+ return createConverter(
+ nullable,
+ (row, column, cv) -> {
+ Timestamp timestamp = row.getTimestamp(column,
precision);
+ if (cv instanceof WritableTimestampVector) {
+ ((WritableTimestampVector)
cv).appendTimestamp(timestamp);
+ } else if (cv instanceof WritableLongVector &&
precision <= 3) {
+ ((WritableLongVector)
cv).appendLong(timestamp.getMillisecond());
+ } else if (cv instanceof WritableLongVector &&
precision <= 6) {
+ ((WritableLongVector)
cv).appendLong(timestamp.toMicros());
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported column vector: " + cv);
+ }
+ });
+ }
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractArrayBasedVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractArrayBasedVector.java
index 14aed29b0d..d9879e051d 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractArrayBasedVector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractArrayBasedVector.java
@@ -19,6 +19,7 @@
package org.apache.paimon.data.columnar.heap;
import org.apache.paimon.data.columnar.ColumnVector;
+import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import java.util.Arrays;
@@ -63,6 +64,21 @@ public class AbstractArrayBasedVector extends
AbstractStructVector {
}
}
+ public void appendArray(int length) {
+ reserve(elementsAppended + 1);
+ for (ColumnVector child : children) {
+ if (child instanceof WritableColumnVector) {
+ WritableColumnVector writableColumnVector =
(WritableColumnVector) child;
+
writableColumnVector.reserve(writableColumnVector.getElementsAppended() +
length);
+ }
+ }
+ if (children[0] instanceof WritableColumnVector) {
+ WritableColumnVector arrayData = (WritableColumnVector)
children[0];
+ putOffsetLength(elementsAppended, arrayData.getElementsAppended(),
length);
+ }
+ elementsAppended++;
+ }
+
@Override
public void reset() {
super.reset();
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBooleanVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBooleanVector.java
index 32e298ac42..8236d2ffe8 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBooleanVector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBooleanVector.java
@@ -81,6 +81,13 @@ public class HeapBooleanVector extends AbstractHeapVector
implements WritableBoo
setBooleans(rowId, 8, src, 0);
}
+ @Override
+ public void appendBoolean(boolean v) {
+ reserve(elementsAppended + 1);
+ setBoolean(elementsAppended, v);
+ elementsAppended++;
+ }
+
@Override
public void fill(boolean value) {
Arrays.fill(vector, value);
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapByteVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapByteVector.java
index 08539979fd..0f7c6224c5 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapByteVector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapByteVector.java
@@ -58,6 +58,13 @@ public class HeapByteVector extends AbstractHeapVector
implements WritableByteVe
Arrays.fill(vector, value);
}
+ @Override
+ public void appendByte(byte v) {
+ reserve(elementsAppended + 1);
+ setByte(elementsAppended, v);
+ elementsAppended++;
+ }
+
@Override
void reserveForHeapVector(int newCapacity) {
if (vector.length < newCapacity) {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java
index 3320ee7f4f..be0c05eee6 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java
@@ -87,6 +87,13 @@ public class HeapBytesVector extends AbstractHeapVector
implements WritableBytes
bytesAppended += length;
}
+ @Override
+ public void appendByteArray(byte[] value, int offset, int length) {
+ reserve(elementsAppended + 1);
+ putByteArray(elementsAppended, value, offset, length);
+ elementsAppended++;
+ }
+
@Override
public void fill(byte[] value) {
reserveBytes(start.length * value.length);
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapDoubleVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapDoubleVector.java
index b49d161ca4..d401a5c14a 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapDoubleVector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapDoubleVector.java
@@ -90,6 +90,13 @@ public class HeapDoubleVector extends AbstractHeapVector
implements WritableDoub
}
}
+ @Override
+ public void appendDouble(double v) {
+ reserve(elementsAppended + 1);
+ setDouble(elementsAppended, v);
+ elementsAppended++;
+ }
+
@Override
public void fill(double value) {
Arrays.fill(vector, value);
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapFloatVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapFloatVector.java
index d12d9b8b1e..24aff19c33 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapFloatVector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapFloatVector.java
@@ -89,6 +89,13 @@ public class HeapFloatVector extends AbstractHeapVector
implements WritableFloat
}
}
+ @Override
+ public void appendFloat(float v) {
+ reserve(elementsAppended + 1);
+ setFloat(elementsAppended, v);
+ elementsAppended++;
+ }
+
@Override
public void fill(float value) {
Arrays.fill(vector, value);
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapLongVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapLongVector.java
index 1e2eed3503..d6a9170728 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapLongVector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapLongVector.java
@@ -84,6 +84,13 @@ public class HeapLongVector extends AbstractHeapVector
implements WritableLongVe
}
}
+ @Override
+ public void appendLong(long v) {
+ reserve(elementsAppended + 1);
+ setLong(elementsAppended, v);
+ elementsAppended++;
+ }
+
@Override
public void fill(long value) {
Arrays.fill(vector, value);
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapMapVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapMapVector.java
index b1d94575cb..1623e1ecd4 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapMapVector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapMapVector.java
@@ -34,10 +34,18 @@ public class HeapMapVector extends AbstractArrayBasedVector
implements MapColumn
children[0] = keys;
}
+ public ColumnVector getKeys() {
+ return children[0];
+ }
+
public void setValues(ColumnVector values) {
children[1] = values;
}
+ public ColumnVector getValues() {
+ return children[1];
+ }
+
@Override
public InternalMap getMap(int i) {
long offset = offsets[i];
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapRowVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapRowVector.java
index 6f1bec9ef9..a031a5d260 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapRowVector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapRowVector.java
@@ -52,6 +52,11 @@ public class HeapRowVector extends AbstractStructVector
// Nothing to store.
}
+ public void appendRow() {
+ reserve(elementsAppended + 1);
+ elementsAppended++;
+ }
+
public void setFields(WritableColumnVector[] fields) {
System.arraycopy(fields, 0, this.children, 0, fields.length);
this.vectorizedColumnBatch = new VectorizedColumnBatch(children);
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapShortVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapShortVector.java
index 86ff1bd01c..8f7335d1c7 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapShortVector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapShortVector.java
@@ -60,6 +60,13 @@ public class HeapShortVector extends AbstractHeapVector
implements WritableShort
vector[i] = value;
}
+ @Override
+ public void appendShort(short v) {
+ reserve(elementsAppended + 1);
+ setShort(elementsAppended, v);
+ elementsAppended++;
+ }
+
@Override
public void fill(short value) {
Arrays.fill(vector, value);
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapTimestampVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapTimestampVector.java
index aa7aec9b1e..d79b5e7547 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapTimestampVector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapTimestampVector.java
@@ -60,6 +60,13 @@ public class HeapTimestampVector extends AbstractHeapVector
implements WritableT
nanoOfMilliseconds[i] = timestamp.getNanoOfMillisecond();
}
+ @Override
+ public void appendTimestamp(Timestamp timestamp) {
+ reserve(elementsAppended + 1);
+ setTimestamp(elementsAppended, timestamp);
+ elementsAppended++;
+ }
+
@Override
public void fill(Timestamp value) {
Arrays.fill(milliseconds, value.getMillisecond());
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableBooleanVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableBooleanVector.java
index 558da35b01..d1c8c86ab5 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableBooleanVector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableBooleanVector.java
@@ -32,6 +32,8 @@ public interface WritableBooleanVector extends
WritableColumnVector, BooleanColu
void setBooleans(int rowId, byte src);
+ void appendBoolean(boolean v);
+
/** Fill the column vector with the provided value. */
void fill(boolean value);
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableByteVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableByteVector.java
index 802ea29888..04c2e02477 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableByteVector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableByteVector.java
@@ -28,4 +28,6 @@ public interface WritableByteVector extends
WritableColumnVector, ByteColumnVect
/** Fill the column vector with the provided value. */
void fill(byte value);
+
+ void appendByte(byte v);
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableBytesVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableBytesVector.java
index 6cba184835..49d1bdf537 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableBytesVector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableBytesVector.java
@@ -29,6 +29,8 @@ public interface WritableBytesVector extends
WritableColumnVector, BytesColumnVe
*/
void putByteArray(int rowId, byte[] value, int offset, int length);
+ void appendByteArray(byte[] value, int offset, int length);
+
/** Fill the column vector with the provided value. */
void fill(byte[] value);
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableColumnVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableColumnVector.java
index 5dff02ae11..c03a171537 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableColumnVector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableColumnVector.java
@@ -33,6 +33,13 @@ public interface WritableColumnVector extends ColumnVector {
/** Set nulls from rowId to rowId + count (exclude). */
void setNulls(int rowId, int count);
+ default void appendNull() {
+ int elementsAppended = getElementsAppended();
+ reserve(elementsAppended + 1);
+ setNullAt(elementsAppended);
+ addElementsAppended(1);
+ }
+
/** Fill the column vector with nulls. */
void fillWithNulls();
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableDoubleVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableDoubleVector.java
index 5b493b6568..049fc30fb6 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableDoubleVector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableDoubleVector.java
@@ -36,6 +36,8 @@ public interface WritableDoubleVector extends
WritableColumnVector, DoubleColumn
*/
void setDoublesFromBinary(int rowId, int count, byte[] src, int srcIndex);
+ void appendDouble(double v);
+
/** Fill the column vector with the provided value. */
void fill(double value);
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableFloatVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableFloatVector.java
index 62e1f4a75e..b94f979dc6 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableFloatVector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableFloatVector.java
@@ -36,6 +36,8 @@ public interface WritableFloatVector extends
WritableColumnVector, FloatColumnVe
*/
void setFloatsFromBinary(int rowId, int count, byte[] src, int srcIndex);
+ void appendFloat(float v);
+
/** Fill the column vector with the provided value. */
void fill(float value);
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableLongVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableLongVector.java
index da9b4aaeb2..158753145b 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableLongVector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableLongVector.java
@@ -36,6 +36,8 @@ public interface WritableLongVector extends
WritableColumnVector, LongColumnVect
*/
void setLongsFromBinary(int rowId, int count, byte[] src, int srcIndex);
+ void appendLong(long v);
+
/** Fill the column vector with the provided value. */
void fill(long value);
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableShortVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableShortVector.java
index 94b70305c7..59077e0386 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableShortVector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableShortVector.java
@@ -26,6 +26,8 @@ public interface WritableShortVector extends
WritableColumnVector, ShortColumnVe
/** Set short at rowId with the provided value. */
void setShort(int rowId, short value);
+ void appendShort(short v);
+
/** Fill the column vector with the provided value. */
void fill(short value);
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableTimestampVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableTimestampVector.java
index e7d308b62d..586c5f092c 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableTimestampVector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/writable/WritableTimestampVector.java
@@ -27,6 +27,8 @@ public interface WritableTimestampVector extends
WritableColumnVector, Timestamp
/** Set {@link Timestamp} at rowId with the provided value. */
void setTimestamp(int rowId, Timestamp timestamp);
+ void appendTimestamp(Timestamp timestamp);
+
/** Fill the column vector with the provided value. */
void fill(Timestamp value);
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/data/columnar/RowToColumnConverterTest.java
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/RowToColumnConverterTest.java
new file mode 100644
index 0000000000..3d28183385
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/RowToColumnConverterTest.java
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.columnar;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.columnar.heap.HeapArrayVector;
+import org.apache.paimon.data.columnar.heap.HeapBooleanVector;
+import org.apache.paimon.data.columnar.heap.HeapByteVector;
+import org.apache.paimon.data.columnar.heap.HeapBytesVector;
+import org.apache.paimon.data.columnar.heap.HeapDoubleVector;
+import org.apache.paimon.data.columnar.heap.HeapFloatVector;
+import org.apache.paimon.data.columnar.heap.HeapIntVector;
+import org.apache.paimon.data.columnar.heap.HeapLongVector;
+import org.apache.paimon.data.columnar.heap.HeapMapVector;
+import org.apache.paimon.data.columnar.heap.HeapRowVector;
+import org.apache.paimon.data.columnar.heap.HeapShortVector;
+import org.apache.paimon.data.columnar.writable.WritableColumnVector;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link RowToColumnConverter}. */
+public class RowToColumnConverterTest {
+
+ @Test
+ public void testConvertIntType() {
+ RowType rowType = RowType.of(new DataField(0, "f", new IntType()));
+ RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+ GenericRow row1 = GenericRow.of(123);
+ GenericRow row2 = GenericRow.of(456);
+
+ HeapIntVector intVector = new HeapIntVector(2);
+ WritableColumnVector[] vectors = new WritableColumnVector[]
{intVector};
+
+ converter.convert(row1, vectors);
+ converter.convert(row2, vectors);
+
+ assertThat(intVector.getInt(0)).isEqualTo(123);
+ assertThat(intVector.getInt(1)).isEqualTo(456);
+ }
+
+ @Test
+ public void testConvertNullableIntType() {
+ RowType rowType = RowType.of(new DataField(0, "f", new IntType()));
+ RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+ GenericRow row1 = GenericRow.of((Object) null);
+ GenericRow row2 = GenericRow.of(789);
+
+ HeapIntVector intVector = new HeapIntVector(2);
+ WritableColumnVector[] vectors = new WritableColumnVector[]
{intVector};
+
+ converter.convert(row1, vectors);
+ converter.convert(row2, vectors);
+
+ assertThat(intVector.isNullAt(0)).isTrue();
+ assertThat(intVector.getInt(1)).isEqualTo(789);
+ }
+
+ @Test
+ public void testConvertBooleanType() {
+ RowType rowType = RowType.of(new DataField(0, "f", new BooleanType()));
+ RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+ GenericRow row1 = GenericRow.of(true);
+ GenericRow row2 = GenericRow.of(false);
+
+ HeapBooleanVector booleanVector = new HeapBooleanVector(2);
+ WritableColumnVector[] vectors = new WritableColumnVector[]
{booleanVector};
+
+ converter.convert(row1, vectors);
+ converter.convert(row2, vectors);
+
+ assertThat(booleanVector.getBoolean(0)).isTrue();
+ assertThat(booleanVector.getBoolean(1)).isFalse();
+ }
+
+ @Test
+ public void testConvertByteType() {
+ RowType rowType = RowType.of(new DataField(0, "f", new TinyIntType()));
+ RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+ GenericRow row1 = GenericRow.of((byte) 123);
+ GenericRow row2 = GenericRow.of((byte) 45);
+
+ HeapByteVector byteVector = new HeapByteVector(2);
+ WritableColumnVector[] vectors = new WritableColumnVector[]
{byteVector};
+
+ converter.convert(row1, vectors);
+ converter.convert(row2, vectors);
+
+ assertThat(byteVector.getByte(0)).isEqualTo((byte) 123);
+ assertThat(byteVector.getByte(1)).isEqualTo((byte) 45);
+ }
+
+ @Test
+ public void testConvertShortType() {
+ RowType rowType = RowType.of(new DataField(0, "f", new
SmallIntType()));
+ RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+ GenericRow row1 = GenericRow.of((short) 1234);
+ GenericRow row2 = GenericRow.of((short) 5678);
+
+ HeapShortVector shortVector = new HeapShortVector(2);
+ WritableColumnVector[] vectors = new WritableColumnVector[]
{shortVector};
+
+ converter.convert(row1, vectors);
+ converter.convert(row2, vectors);
+
+ assertThat(shortVector.getShort(0)).isEqualTo((short) 1234);
+ assertThat(shortVector.getShort(1)).isEqualTo((short) 5678);
+ }
+
+ @Test
+ public void testConvertLongType() {
+ RowType rowType = RowType.of(new DataField(0, "f", new BigIntType()));
+ RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+ GenericRow row1 = GenericRow.of(123456789L);
+ GenericRow row2 = GenericRow.of(987654321L);
+
+ HeapLongVector longVector = new HeapLongVector(2);
+ WritableColumnVector[] vectors = new WritableColumnVector[]
{longVector};
+
+ converter.convert(row1, vectors);
+ converter.convert(row2, vectors);
+
+ assertThat(longVector.getLong(0)).isEqualTo(123456789L);
+ assertThat(longVector.getLong(1)).isEqualTo(987654321L);
+ }
+
+ @Test
+ public void testConvertFloatType() {
+ RowType rowType = RowType.of(new DataField(0, "f", new FloatType()));
+ RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+ GenericRow row1 = GenericRow.of(123.45f);
+ GenericRow row2 = GenericRow.of(678.90f);
+
+ HeapFloatVector floatVector = new HeapFloatVector(2);
+ WritableColumnVector[] vectors = new WritableColumnVector[]
{floatVector};
+
+ converter.convert(row1, vectors);
+ converter.convert(row2, vectors);
+
+ assertThat(floatVector.getFloat(0)).isEqualTo(123.45f);
+ assertThat(floatVector.getFloat(1)).isEqualTo(678.90f);
+ }
+
+ @Test
+ public void testConvertDoubleType() {
+ RowType rowType = RowType.of(new DataField(0, "f", new DoubleType()));
+ RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+ GenericRow row1 = GenericRow.of(123.456789D);
+ GenericRow row2 = GenericRow.of(987.654321D);
+
+ HeapDoubleVector doubleVector = new HeapDoubleVector(2);
+ WritableColumnVector[] vectors = new WritableColumnVector[]
{doubleVector};
+
+ converter.convert(row1, vectors);
+ converter.convert(row2, vectors);
+
+ assertThat(doubleVector.getDouble(0)).isEqualTo(123.456789D);
+ assertThat(doubleVector.getDouble(1)).isEqualTo(987.654321D);
+ }
+
+ @Test
+ public void testConvertStringType() {
+ RowType rowType = RowType.of(new DataField(0, "f", new
VarCharType(10)));
+ RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+ GenericRow row1 = GenericRow.of(BinaryString.fromString("hello"));
+ GenericRow row2 = GenericRow.of(BinaryString.fromString("world"));
+
+ HeapBytesVector bytesVector = new HeapBytesVector(2);
+ WritableColumnVector[] vectors = new WritableColumnVector[]
{bytesVector};
+
+ converter.convert(row1, vectors);
+ converter.convert(row2, vectors);
+
+ assertThat(new
String(bytesVector.getBytes(0).getBytes())).isEqualTo("hello");
+ assertThat(new
String(bytesVector.getBytes(1).getBytes())).isEqualTo("world");
+ }
+
+ @Test
+ public void testConvertBinaryType() {
+ RowType rowType = RowType.of(new DataField(0, "f", new BinaryType(5)));
+ RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+ GenericRow row1 = GenericRow.of((Object) "hello".getBytes());
+ GenericRow row2 = GenericRow.of((Object) "world".getBytes());
+
+ HeapBytesVector bytesVector = new HeapBytesVector(2);
+ WritableColumnVector[] vectors = new WritableColumnVector[]
{bytesVector};
+
+ converter.convert(row1, vectors);
+ converter.convert(row2, vectors);
+
+ assertThat(new
String(bytesVector.getBytes(0).getBytes())).isEqualTo("hello");
+ assertThat(new
String(bytesVector.getBytes(1).getBytes())).isEqualTo("world");
+ }
+
+ @Test
+ public void testConvertVarBinaryType() {
+ RowType rowType = RowType.of(new DataField(0, "f", new
VarBinaryType(10)));
+ RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+ GenericRow row1 = GenericRow.of((Object) "hello".getBytes());
+ GenericRow row2 = GenericRow.of((Object) "world".getBytes());
+
+ HeapBytesVector bytesVector = new HeapBytesVector(2);
+ WritableColumnVector[] vectors = new WritableColumnVector[]
{bytesVector};
+
+ converter.convert(row1, vectors);
+ converter.convert(row2, vectors);
+
+ assertThat(new
String(bytesVector.getBytes(0).getBytes())).isEqualTo("hello");
+ assertThat(new
String(bytesVector.getBytes(1).getBytes())).isEqualTo("world");
+ }
+
+ @Test
+ public void testConvertDecimalType() {
+ RowType rowType = RowType.of(new DataField(0, "f", new DecimalType(10,
2)));
+ RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+ Decimal d1 = Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2);
+ Decimal d2 = Decimal.fromBigDecimal(new BigDecimal("678.90"), 10, 2);
+ GenericRow row1 = GenericRow.of(d1);
+ GenericRow row2 = GenericRow.of(d2);
+
+ HeapLongVector longVector = new HeapLongVector(2);
+ WritableColumnVector[] vectors = new WritableColumnVector[]
{longVector};
+
+ converter.convert(row1, vectors);
+ converter.convert(row2, vectors);
+
+ assertThat(longVector.getLong(0)).isEqualTo(d1.toUnscaledLong());
+ assertThat(longVector.getLong(1)).isEqualTo(d2.toUnscaledLong());
+ }
+
+ @Test
+ public void testConvertTimestampType() {
+ RowType rowType = RowType.of(new DataField(0, "f", new
TimestampType(3)));
+ RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+ Timestamp timestamp1 = Timestamp.fromEpochMillis(1234567890L);
+ Timestamp timestamp2 = Timestamp.fromEpochMillis(9876543210L);
+
+ GenericRow row1 = GenericRow.of(timestamp1);
+ GenericRow row2 = GenericRow.of(timestamp2);
+
+ HeapLongVector longVector = new HeapLongVector(2);
+ WritableColumnVector[] vectors = new WritableColumnVector[]
{longVector};
+
+ converter.convert(row1, vectors);
+ converter.convert(row2, vectors);
+
+ assertThat(longVector.getLong(0)).isEqualTo(1234567890L);
+ assertThat(longVector.getLong(1)).isEqualTo(9876543210L);
+ }
+
+ @Test
+ public void testConvertDateType() {
+ RowType rowType = RowType.of(new DataField(0, "f", new DateType()));
+ RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+ GenericRow row1 = GenericRow.of(12345);
+ GenericRow row2 = GenericRow.of(67890);
+
+ HeapIntVector intVector = new HeapIntVector(2);
+ WritableColumnVector[] vectors = new WritableColumnVector[]
{intVector};
+
+ converter.convert(row1, vectors);
+ converter.convert(row2, vectors);
+
+ assertThat(intVector.getInt(0)).isEqualTo(12345);
+ assertThat(intVector.getInt(1)).isEqualTo(67890);
+ }
+
+ @Test
+ public void testConvertTimeType() {
+ RowType rowType = RowType.of(new DataField(0, "f", new TimeType()));
+ RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+ GenericRow row1 = GenericRow.of(12345);
+ GenericRow row2 = GenericRow.of(67890);
+
+ HeapIntVector intVector = new HeapIntVector(2);
+ WritableColumnVector[] vectors = new WritableColumnVector[]
{intVector};
+
+ converter.convert(row1, vectors);
+ converter.convert(row2, vectors);
+
+ assertThat(intVector.getInt(0)).isEqualTo(12345);
+ assertThat(intVector.getInt(1)).isEqualTo(67890);
+ }
+
+ @Test
+ public void testConvertArrayType() {
+ RowType rowType = RowType.of(new DataField(0, "f", new ArrayType(new
IntType())));
+ RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+ GenericRow row1 = GenericRow.of(new GenericArray(new Object[] {1, 2,
3}));
+ GenericRow row2 = GenericRow.of(new GenericArray(new Object[] {4, 5,
6}));
+
+ HeapIntVector elementVector = new HeapIntVector(6);
+ HeapArrayVector arrayVector = new HeapArrayVector(2, elementVector);
+ arrayVector.putOffsetLength(0, 0, 3);
+ arrayVector.putOffsetLength(1, 3, 3);
+
+ WritableColumnVector[] vectors = new WritableColumnVector[]
{arrayVector};
+
+ converter.convert(row1, vectors);
+ converter.convert(row2, vectors);
+
+ assertThat(elementVector.getInt(0)).isEqualTo(1);
+ assertThat(elementVector.getInt(1)).isEqualTo(2);
+ assertThat(elementVector.getInt(2)).isEqualTo(3);
+ assertThat(elementVector.getInt(3)).isEqualTo(4);
+ assertThat(elementVector.getInt(4)).isEqualTo(5);
+ assertThat(elementVector.getInt(5)).isEqualTo(6);
+
+ // test reserve
+ GenericRow row3 = GenericRow.of(new GenericArray(new Object[] {7, 8}));
+ converter.convert(row3, vectors);
+ assertThat(elementVector.getInt(6)).isEqualTo(7);
+ assertThat(elementVector.getInt(7)).isEqualTo(8);
+ }
+
+ @Test
+ public void testConvertMapType() {
+ RowType rowType =
+ RowType.of(
+ new DataField(0, "f",
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT())));
+ RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+ Map<BinaryString, Integer> map1 = new LinkedHashMap<>();
+ map1.put(BinaryString.fromString("a"), 1);
+ map1.put(BinaryString.fromString("b"), 2);
+ GenericRow row1 = GenericRow.of(new GenericMap(map1));
+
+ Map<BinaryString, Integer> map2 = new LinkedHashMap<>();
+ map2.put(BinaryString.fromString("c"), 3);
+ map2.put(BinaryString.fromString("d"), 4);
+ GenericRow row2 = GenericRow.of(new GenericMap(map2));
+
+ HeapBytesVector keyVector = new HeapBytesVector(4);
+ HeapIntVector valueVector = new HeapIntVector(4);
+ HeapMapVector mapVector = new HeapMapVector(2, keyVector, valueVector);
+ mapVector.putOffsetLength(0, 0, 2);
+ mapVector.putOffsetLength(1, 2, 2);
+
+ WritableColumnVector[] vectors = new WritableColumnVector[]
{mapVector};
+
+ converter.convert(row1, vectors);
+ converter.convert(row2, vectors);
+
+ assertThat(new
String(keyVector.getBytes(0).getBytes())).isEqualTo("a");
+ assertThat(valueVector.getInt(0)).isEqualTo(1);
+ assertThat(new
String(keyVector.getBytes(1).getBytes())).isEqualTo("b");
+ assertThat(valueVector.getInt(1)).isEqualTo(2);
+ assertThat(new
String(keyVector.getBytes(2).getBytes())).isEqualTo("c");
+ assertThat(valueVector.getInt(2)).isEqualTo(3);
+ assertThat(new
String(keyVector.getBytes(3).getBytes())).isEqualTo("d");
+ assertThat(valueVector.getInt(3)).isEqualTo(4);
+
+ // test reserve
+ Map<BinaryString, Integer> map3 = new LinkedHashMap<>();
+ map3.put(BinaryString.fromString("e"), 5);
+ GenericRow row3 = GenericRow.of(new GenericMap(map3));
+ converter.convert(row3, vectors);
+ assertThat(new
String(keyVector.getBytes(4).getBytes())).isEqualTo("e");
+ assertThat(valueVector.getInt(4)).isEqualTo(5);
+ }
+
+ @Test
+ public void testConvertRowType() {
+ RowType rowType =
+ RowType.of(
+ new DataField(
+ 0,
+ "f",
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "id",
DataTypes.INT()),
+ DataTypes.FIELD(1, "name",
DataTypes.STRING()))));
+ RowToColumnConverter converter = new RowToColumnConverter(rowType);
+
+ GenericRow rowValue1 = GenericRow.of(1,
BinaryString.fromString("Alice"));
+ GenericRow row1 = GenericRow.of(rowValue1);
+
+ GenericRow rowValue2 = GenericRow.of(2,
BinaryString.fromString("Bob"));
+ GenericRow row2 = GenericRow.of(rowValue2);
+
+ HeapIntVector idVector = new HeapIntVector(2);
+ HeapBytesVector nameVector = new HeapBytesVector(2);
+ HeapRowVector rowVector = new HeapRowVector(2, idVector, nameVector);
+
+ WritableColumnVector[] vectors = new WritableColumnVector[]
{rowVector};
+
+ converter.convert(row1, vectors);
+ converter.convert(row2, vectors);
+
+ assertThat(idVector.getInt(0)).isEqualTo(1);
+ assertThat(new
String(nameVector.getBytes(0).getBytes())).isEqualTo("Alice");
+ assertThat(idVector.getInt(1)).isEqualTo(2);
+ assertThat(new
String(nameVector.getBytes(1).getBytes())).isEqualTo("Bob");
+
+ // test reserve
+ GenericRow row3 = GenericRow.of(GenericRow.of(3,
BinaryString.fromString("Charlie")));
+ converter.convert(row3, vectors);
+ assertThat(idVector.getInt(2)).isEqualTo(3);
+ assertThat(new
String(nameVector.getBytes(2).getBytes())).isEqualTo("Charlie");
+ assertThat(rowVector.getRow(2).getInt(0)).isEqualTo(3);
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java
index 543005d7d2..5ac0c2e5d2 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java
@@ -163,6 +163,13 @@ public class ParquetDecimalVector
}
}
+ @Override
+ public void appendByteArray(byte[] value, int offset, int length) {
+ if (vector instanceof WritableBytesVector) {
+ ((WritableBytesVector) vector).appendByteArray(value, offset,
length);
+ }
+ }
+
@Override
public void fill(byte[] value) {
if (vector instanceof WritableBytesVector) {
@@ -242,6 +249,13 @@ public class ParquetDecimalVector
}
}
+ @Override
+ public void appendLong(long v) {
+ if (vector instanceof WritableLongVector) {
+ ((WritableLongVector) vector).appendLong(v);
+ }
+ }
+
@Override
public void setLongsFromBinary(int rowId, int count, byte[] src, int
srcIndex) {
if (vector instanceof WritableLongVector) {