This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c767c61a9a [core] add columnar impl for InternalVector and link to 
arrow module (#7236)
c767c61a9a is described below

commit c767c61a9ab88a15ffbc12c116b56f9d390a8782
Author: ColdL <[email protected]>
AuthorDate: Mon Feb 9 11:20:46 2026 +0800

    [core] add columnar impl for InternalVector and link to arrow module (#7236)
---
 .../java/org/apache/paimon/arrow/ArrowUtils.java   |  15 +-
 .../converter/Arrow2PaimonVectorConverter.java     | 112 +++++++++++++-
 .../writer/ArrowFieldWriterFactoryVisitor.java     |  10 +-
 .../paimon/arrow/writer/ArrowFieldWriters.java     |  90 +++++++++++
 .../org/apache/paimon/arrow/ArrowUtilsTest.java    |  10 ++
 .../ArrowVectorizedBatchConverterTest.java         | 164 +++++++++++++++++++++
 .../paimon/arrow/vector/ArrowFormatWriterTest.java |  83 +++++++++++
 .../apache/paimon/data/columnar/ColumnarArray.java |   2 +-
 .../{ColumnarArray.java => ColumnarVec.java}       |  73 +++++----
 .../paimon/data/columnar/VecColumnVector.java      |  30 ++++
 .../data/columnar/VectorizedColumnBatch.java       |   2 +-
 .../apache/paimon/utils/VectorMappingUtils.java    |  25 +++-
 .../data/columnar/ColumnarRowWithVectorTest.java   | 126 ++++++++++++++++
 .../paimon/data/columnar/ColumnarVecTest.java      |  81 ++++++++++
 .../paimon/utils/VectorMappingUtilsTest.java       |  20 ++-
 .../paimon/format/lance/LanceReaderWriterTest.java |  50 +++++++
 .../format/lance/VectorTypeWithLanceTest.java      | 159 ++++++++++++++++++++
 17 files changed, 1006 insertions(+), 46 deletions(-)

diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
index 7b387b409e..041927b7fb 100644
--- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
+++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
@@ -30,6 +30,7 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.VariantType;
+import org.apache.paimon.types.VectorType;
 
 import org.apache.arrow.c.ArrowArray;
 import org.apache.arrow.c.ArrowSchema;
@@ -137,14 +138,16 @@ public class ArrowUtils {
                         fieldType.getDictionary(),
                         Collections.singletonMap(PARQUET_FIELD_ID, 
String.valueOf(fieldId)));
         List<Field> children = null;
-        if (dataType instanceof ArrayType) {
+        if (dataType instanceof ArrayType || dataType instanceof VectorType) {
+            final DataType elementType;
+            if (dataType instanceof VectorType) {
+                elementType = ((VectorType) dataType).getElementType();
+            } else {
+                elementType = ((ArrayType) dataType).getElementType();
+            }
             Field field =
                     toArrowField(
-                            ListVector.DATA_VECTOR_NAME,
-                            fieldId,
-                            ((ArrayType) dataType).getElementType(),
-                            depth + 1,
-                            visitor);
+                            ListVector.DATA_VECTOR_NAME, fieldId, elementType, 
depth + 1, visitor);
             FieldType typeInner = field.getFieldType();
             field =
                     new Field(
diff --git 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java
 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java
index c78aa324ae..e1fe66883a 100644
--- 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java
+++ 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.data.columnar.ArrayColumnVector;
 import org.apache.paimon.data.columnar.BooleanColumnVector;
@@ -31,6 +32,7 @@ import org.apache.paimon.data.columnar.ColumnVector;
 import org.apache.paimon.data.columnar.ColumnarArray;
 import org.apache.paimon.data.columnar.ColumnarMap;
 import org.apache.paimon.data.columnar.ColumnarRow;
+import org.apache.paimon.data.columnar.ColumnarVec;
 import org.apache.paimon.data.columnar.DecimalColumnVector;
 import org.apache.paimon.data.columnar.DoubleColumnVector;
 import org.apache.paimon.data.columnar.FloatColumnVector;
@@ -40,6 +42,7 @@ import org.apache.paimon.data.columnar.MapColumnVector;
 import org.apache.paimon.data.columnar.RowColumnVector;
 import org.apache.paimon.data.columnar.ShortColumnVector;
 import org.apache.paimon.data.columnar.TimestampColumnVector;
+import org.apache.paimon.data.columnar.VecColumnVector;
 import org.apache.paimon.data.columnar.VectorizedColumnBatch;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
@@ -68,6 +71,7 @@ import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.types.VariantType;
 import org.apache.paimon.types.VectorType;
 
+import org.apache.arrow.memory.ArrowBuf;
 import org.apache.arrow.vector.BigIntVector;
 import org.apache.arrow.vector.BitVector;
 import org.apache.arrow.vector.DateDayVector;
@@ -83,6 +87,7 @@ import org.apache.arrow.vector.TimeStampVector;
 import org.apache.arrow.vector.TinyIntVector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.complex.FixedSizeListVector;
 import org.apache.arrow.vector.complex.ListVector;
 import org.apache.arrow.vector.complex.StructVector;
 
@@ -485,7 +490,57 @@ public interface Arrow2PaimonVectorConverter {
 
         @Override
         public Arrow2PaimonVectorConverter visit(VectorType vectorType) {
-            throw new UnsupportedOperationException("Doesn't support 
VectorType.");
+            final Arrow2PaimonVectorConverter arrowVectorConvertor =
+                    vectorType.getElementType().accept(this);
+
+            return vector ->
+                    new VecColumnVector() {
+
+                        private boolean inited = false;
+                        private ColumnVector columnVector;
+                        private ColumnarVec.Factory factory;
+
+                        private void init() {
+                            if (!inited) {
+                                if (!(vector instanceof FixedSizeListVector)) {
+                                    throw new UnsupportedOperationException(
+                                            "Cannot convert " + 
vector.getClass() + " to vector");
+                                }
+                                FixedSizeListVector listVector = 
(FixedSizeListVector) vector;
+                                FieldVector dataVector = 
listVector.getDataVector();
+                                factory =
+                                        new Arrow2ColumnarVecFactory(
+                                                
dataVector.getValidityBuffer());
+                                this.columnVector = 
arrowVectorConvertor.convertVector(dataVector);
+                                inited = true;
+                            }
+                        }
+
+                        @Override
+                        public boolean isNullAt(int index) {
+                            return vector.isNull(index);
+                        }
+
+                        @Override
+                        public InternalVector getVector(int index) {
+                            init();
+                            FixedSizeListVector listVector = 
(FixedSizeListVector) vector;
+                            int start = listVector.getElementStartIndex(index);
+                            int end = listVector.getElementEndIndex(index);
+                            return factory.create(columnVector, start, end - 
start);
+                        }
+
+                        @Override
+                        public ColumnVector getColumnVector() {
+                            init();
+                            return columnVector;
+                        }
+
+                        @Override
+                        public int getVectorSize() {
+                            return vectorType.getLength();
+                        }
+                    };
         }
 
         @Override
@@ -595,5 +650,60 @@ public interface Arrow2PaimonVectorConverter {
                         }
                     };
         }
+
+        private static final int[] VALIDITY_BYTE_MASK =
+                new int[] {0b0, 0b1, 0b11, 0b111, 0b1111, 0b11111, 0b111111, 
0b1111111, 0b11111111};
+
+        private static class Arrow2ColumnarVecFactory extends 
ColumnarVec.Factory {
+
+            private final ArrowBuf validityBuf;
+            private final boolean nullable;
+
+            private Arrow2ColumnarVecFactory(ArrowBuf validityBuf) {
+                this.validityBuf = validityBuf;
+                this.nullable = validityBuf != null && validityBuf.capacity() 
> 0;
+            }
+
+            @Override
+            public void ensureNonNull(ColumnVector data, int offset, int 
numElements) {
+                if (!nullable) {
+                    return;
+                }
+
+                final int startByteIndex = offset >> 3;
+                final int startBitIndex = offset & 7;
+                final int endByteIndex = (offset + numElements - 1) >> 3;
+                final int endBitIndex = (offset + numElements - 1) & 7;
+
+                if (startByteIndex == endByteIndex) {
+                    byte bits = validityBuf.getByte(startByteIndex);
+                    checkValidityRange(bits, startBitIndex, endBitIndex);
+                    return;
+                }
+
+                byte bits = validityBuf.getByte(startByteIndex);
+                checkValidityRange(bits, startBitIndex, 7);
+                for (int i = startByteIndex + 1; i < endByteIndex; ++i) {
+                    bits = validityBuf.getByte(i);
+                    checkValidityAll(bits);
+                }
+                bits = validityBuf.getByte(endByteIndex);
+                checkValidityRange(bits, 0, endBitIndex);
+            }
+
+            private void checkValidityAll(byte bits) {
+                if ((bits & 0xFF) != 0xFF) {
+                    throw new UnsupportedOperationException("Vector elements 
must be nonNull");
+                }
+            }
+
+            private void checkValidityRange(byte bits, int start, int end) {
+                int r = (bits & 0xFF) | VALIDITY_BYTE_MASK[start];
+                int m = VALIDITY_BYTE_MASK[end + 1];
+                if ((r & m) != m) {
+                    throw new UnsupportedOperationException("Vector elements 
must be nonNull");
+                }
+            }
+        }
     }
 }
diff --git 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java
 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java
index d07762106e..ccff6d6a24 100644
--- 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java
+++ 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java
@@ -44,6 +44,7 @@ import org.apache.paimon.types.VariantType;
 import org.apache.paimon.types.VectorType;
 
 import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.complex.FixedSizeListVector;
 import org.apache.arrow.vector.complex.ListVector;
 import org.apache.arrow.vector.complex.MapVector;
 
@@ -168,7 +169,14 @@ public class ArrowFieldWriterFactoryVisitor implements 
DataTypeVisitor<ArrowFiel
 
     @Override
     public ArrowFieldWriterFactory visit(VectorType vectorType) {
-        throw new UnsupportedOperationException("Doesn't support VectorType.");
+        ArrowFieldWriterFactory elementWriterFactory = 
vectorType.getElementType().accept(this);
+        return (fieldVector, isNullable) ->
+                new ArrowFieldWriters.VectorWriter(
+                        fieldVector,
+                        vectorType.getLength(),
+                        elementWriterFactory.create(
+                                ((FixedSizeListVector) 
fieldVector).getDataVector(), isNullable),
+                        isNullable);
     }
 
     @Override
diff --git 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java
 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java
index 409b1f9ab2..7cb64de7fd 100644
--- 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java
+++ 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java
@@ -25,6 +25,7 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.data.columnar.ArrayColumnVector;
 import org.apache.paimon.data.columnar.BooleanColumnVector;
@@ -40,6 +41,7 @@ import org.apache.paimon.data.columnar.MapColumnVector;
 import org.apache.paimon.data.columnar.RowColumnVector;
 import org.apache.paimon.data.columnar.ShortColumnVector;
 import org.apache.paimon.data.columnar.TimestampColumnVector;
+import org.apache.paimon.data.columnar.VecColumnVector;
 import org.apache.paimon.data.columnar.VectorizedColumnBatch;
 import org.apache.paimon.data.variant.GenericVariant;
 import org.apache.paimon.data.variant.PaimonShreddingUtils;
@@ -65,6 +67,7 @@ import org.apache.arrow.vector.TimeStampVector;
 import org.apache.arrow.vector.TinyIntVector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.complex.FixedSizeListVector;
 import org.apache.arrow.vector.complex.ListVector;
 import org.apache.arrow.vector.complex.MapVector;
 import org.apache.arrow.vector.complex.StructVector;
@@ -711,6 +714,93 @@ public class ArrowFieldWriters {
         }
     }
 
+    /** Writer for VECTOR. */
+    public static class VectorWriter extends ArrowFieldWriter {
+
+        private final ArrowFieldWriter elementWriter;
+
+        private final int length;
+
+        public VectorWriter(
+                FieldVector fieldVector,
+                int length,
+                ArrowFieldWriter elementWriter,
+                boolean isNullable) {
+            super(fieldVector, isNullable);
+            this.length = length;
+            this.elementWriter = elementWriter;
+        }
+
+        @Override
+        public void reset() {
+            super.reset();
+            elementWriter.reset();
+        }
+
+        @Override
+        protected void doWrite(
+                ColumnVector columnVector,
+                @Nullable int[] pickedInColumn,
+                int startIndex,
+                int batchRows) {
+            VecColumnVector vecColumnVector = (VecColumnVector) columnVector;
+
+            if (pickedInColumn == null) {
+                elementWriter.write(
+                        vecColumnVector.getColumnVector(),
+                        null,
+                        startIndex * length,
+                        batchRows * length);
+            } else {
+                int[] childPickedInColumn = new int[batchRows * length];
+                for (int i = 0; i < batchRows; ++i) {
+                    int pickedIndexInChild = pickedInColumn[startIndex + i] * 
length;
+                    for (int j = 0; j < length; ++j) {
+                        childPickedInColumn[i * length + j] = 
pickedIndexInChild + j;
+                    }
+                }
+                elementWriter.write(
+                        vecColumnVector.getColumnVector(),
+                        childPickedInColumn,
+                        0,
+                        batchRows * length);
+            }
+
+            // set FixedSizeListVector
+            FixedSizeListVector listVector = (FixedSizeListVector) fieldVector;
+            for (int i = 0; i < batchRows; i++) {
+                int row = getRowNumber(startIndex, i, pickedInColumn);
+                if (vecColumnVector.isNullAt(row)) {
+                    listVector.setNull(i);
+                } else {
+                    listVector.startNewValue(i);
+                }
+            }
+        }
+
+        @Override
+        protected void doWrite(int rowIndex, DataGetters getters, int pos) {
+            InternalVector vector = getters.getVector(pos);
+            if (vector.size() != length) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "The size of vector %s is not equal to the 
length %s",
+                                vector.size(), length));
+            }
+            FixedSizeListVector listVector = (FixedSizeListVector) fieldVector;
+            listVector.setNotNull(rowIndex);
+            final int rowBase = rowIndex * length;
+            for (int vectorIndex = 0; vectorIndex < length; ++vectorIndex) {
+                elementWriter.write(rowBase + vectorIndex, vector, 
vectorIndex);
+            }
+            // Ensure child value count is large enough.
+            listVector
+                    .getDataVector()
+                    .setValueCount(
+                            
Math.max(listVector.getDataVector().getValueCount(), rowBase + length));
+        }
+    }
+
     /** Writer for MAP. */
     public static class MapWriter extends ArrowFieldWriter {
 
diff --git 
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/ArrowUtilsTest.java 
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/ArrowUtilsTest.java
index 49af975150..ee613a05d7 100644
--- a/paimon-arrow/src/test/java/org/apache/paimon/arrow/ArrowUtilsTest.java
+++ b/paimon-arrow/src/test/java/org/apache/paimon/arrow/ArrowUtilsTest.java
@@ -24,6 +24,7 @@ import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 
 import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
 import org.assertj.core.api.Assertions;
@@ -108,4 +109,13 @@ public class ArrowUtilsTest {
                         .getFieldType();
         Assertions.assertThat(fieldType.isNullable()).isTrue();
     }
+
+    @Test
+    public void testVectorType() {
+        Field field =
+                ArrowUtils.toArrowField("embed", 0, DataTypes.VECTOR(4, 
DataTypes.FLOAT()), 0);
+        Assertions.assertThat(field.getFieldType().getType())
+                .isEqualTo(new ArrowType.FixedSizeList(4));
+        Assertions.assertThat(field.getChildren()).hasSize(1);
+    }
 }
diff --git 
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverterTest.java
 
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverterTest.java
new file mode 100644
index 0000000000..ec484de64b
--- /dev/null
+++ 
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverterTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.arrow.converter;
+
+import org.apache.paimon.arrow.ArrowUtils;
+import org.apache.paimon.arrow.writer.ArrowFieldWriter;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
+import org.apache.paimon.data.columnar.ColumnVector;
+import org.apache.paimon.data.columnar.ColumnarVec;
+import org.apache.paimon.data.columnar.VecColumnVector;
+import org.apache.paimon.data.columnar.VectorizedColumnBatch;
+import org.apache.paimon.data.columnar.heap.HeapFloatVector;
+import org.apache.paimon.reader.VectorizedRecordIterator;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.FixedSizeListVector;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.function.IntPredicate;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ArrowVectorizedBatchConverter}. */
+public class ArrowVectorizedBatchConverterTest {
+
+    @Test
+    public void testVectorColumnWrite() {
+        RowType rowType = RowType.of(DataTypes.VECTOR(3, DataTypes.FLOAT()));
+        try (RootAllocator allocator = new RootAllocator()) {
+            VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(rowType, 
allocator);
+            ArrowFieldWriter[] fieldWriters = 
ArrowUtils.createArrowFieldWriters(vsr, rowType);
+
+            int length = 3;
+            int rows = 2;
+            HeapFloatVector elementVector = new HeapFloatVector(rows * length);
+            float[] values = new float[] {1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 6.0f};
+            for (int i = 0; i < values.length; i++) {
+                elementVector.setFloat(i, values[i]);
+            }
+            VecColumnVector vector =
+                    new TestVecColumnVectorWithNulls(elementVector, length, i 
-> i % 2 == 1);
+
+            VectorizedColumnBatch batch = new VectorizedColumnBatch(new 
ColumnVector[] {vector});
+            batch.setNumRows(rows);
+
+            ArrowVectorizedBatchConverter converter =
+                    new ArrowVectorizedBatchConverter(vsr, fieldWriters);
+            converter.reset(
+                    new VectorizedRecordIterator() {
+                        @Override
+                        public VectorizedColumnBatch batch() {
+                            return batch;
+                        }
+
+                        @Override
+                        public InternalRow next() {
+                            return null;
+                        }
+
+                        @Override
+                        public void releaseBatch() {}
+                    });
+            converter.next(rows);
+
+            FixedSizeListVector listVector = (FixedSizeListVector) 
vsr.getVector(0);
+            assertThat(listVector.isNull(0)).isFalse();
+            assertThat(listVector.isNull(1)).isTrue();
+
+            @SuppressWarnings("unchecked")
+            List<Float> row0 = (List<Float>) listVector.getObject(0);
+            assertThat(row0).containsExactly(1.0f, 2.0f, 3.0f);
+            assertThat(listVector.getObject(1)).isNull();
+
+            converter.close();
+        }
+    }
+
+    @Test
+    public void testVectorColumnWriteWithPickedInColumn() {
+        RowType rowType = RowType.of(DataTypes.VECTOR(2, DataTypes.FLOAT()));
+        try (RootAllocator allocator = new RootAllocator()) {
+            VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(rowType, 
allocator);
+            ArrowFieldWriter[] fieldWriters = 
ArrowUtils.createArrowFieldWriters(vsr, rowType);
+
+            int length = 2;
+            int rows = 4;
+            HeapFloatVector elementVector = new HeapFloatVector(rows * length);
+            float[] values = new float[] {1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 6.0f, 
7.0f, 8.0f};
+            for (int i = 0; i < values.length; i++) {
+                elementVector.setFloat(i, values[i]);
+            }
+
+            VecColumnVector vector = new 
TestVecColumnVectorWithNulls(elementVector, length, null);
+
+            int[] pickedInColumn = new int[] {2, 0};
+            fieldWriters[0].reset();
+            fieldWriters[0].write(vector, pickedInColumn, 0, 
pickedInColumn.length);
+
+            FixedSizeListVector listVector = (FixedSizeListVector) 
vsr.getVector(0);
+            @SuppressWarnings("unchecked")
+            List<Float> row0 = (List<Float>) listVector.getObject(0);
+            assertThat(row0).containsExactly(5.0f, 6.0f);
+            @SuppressWarnings("unchecked")
+            List<Float> row1 = (List<Float>) listVector.getObject(1);
+            assertThat(row1).containsExactly(1.0f, 2.0f);
+
+            vsr.close();
+        }
+    }
+
+    private static class TestVecColumnVectorWithNulls implements 
VecColumnVector {
+
+        private final ColumnVector data;
+        private final int length;
+        private final IntPredicate nulls;
+
+        private TestVecColumnVectorWithNulls(ColumnVector data, int length, 
IntPredicate nulls) {
+            this.data = data;
+            this.length = length;
+            this.nulls = nulls;
+        }
+
+        @Override
+        public InternalVector getVector(int i) {
+            return ColumnarVec.DEFAULT_FACTORY.create(data, i * length, 
length);
+        }
+
+        @Override
+        public ColumnVector getColumnVector() {
+            return data;
+        }
+
+        @Override
+        public int getVectorSize() {
+            return length;
+        }
+
+        @Override
+        public boolean isNullAt(int i) {
+            return (nulls != null) && nulls.test(i);
+        }
+    }
+}
diff --git 
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
 
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
index ec1fe10c46..9b0333f376 100644
--- 
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
+++ 
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
@@ -24,6 +24,7 @@ import 
org.apache.paimon.arrow.converter.Arrow2PaimonVectorConverter;
 import org.apache.paimon.arrow.reader.ArrowBatchReader;
 import org.apache.paimon.arrow.writer.ArrowFieldWriterFactoryVisitor;
 import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.BinaryVector;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.GenericArray;
 import org.apache.paimon.data.GenericMap;
@@ -36,6 +37,7 @@ import org.apache.paimon.data.variant.Variant;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VectorType;
 import org.apache.paimon.utils.StringUtils;
 
 import org.apache.arrow.memory.BufferAllocator;
@@ -140,6 +142,87 @@ public class ArrowFormatWriterTest {
         }
     }
 
+    @Test
+    public void testWriteVector() {
+        RowType rowType =
+                new RowType(
+                        Arrays.asList(
+                                new DataField(0, "id", DataTypes.INT()),
+                                new DataField(1, "embed", DataTypes.VECTOR(3, 
DataTypes.FLOAT()))));
+        float[] values = new float[] {1.0f, 2.0f, 3.0f};
+        try (ArrowFormatWriter writer = new ArrowFormatWriter(rowType, 16, 
true)) {
+            writer.write(GenericRow.of(1, 
BinaryVector.fromPrimitiveArray(values)));
+
+            writer.flush();
+            VectorSchemaRoot vectorSchemaRoot = writer.getVectorSchemaRoot();
+
+            ArrowBatchReader arrowBatchReader = new ArrowBatchReader(rowType, 
true);
+            Iterable<InternalRow> rows = 
arrowBatchReader.readBatch(vectorSchemaRoot);
+
+            Iterator<InternalRow> iterator = rows.iterator();
+            InternalRow row = iterator.next();
+
+            assertThat(row.getInt(0)).isEqualTo(1);
+            assertThat(row.getVector(1).toFloatArray()).isEqualTo(values);
+            vectorSchemaRoot.close();
+        }
+    }
+
+    @Test
+    public void testWriteVectorWithNulls() {
+        // Arrow2ColumnarVecFactory needs to handle bit-level checks on the 
validity buffer.
+        // Different lengths can cover validation across multiple bytes.
+        for (int length = 1; length <= 18; ++length) {
+            VectorType vectorType = DataTypes.VECTOR(length, 
DataTypes.FLOAT());
+            RowType rowType =
+                    new RowType(
+                            Arrays.asList(
+                                    new DataField(0, "id", DataTypes.INT()),
+                                    new DataField(1, "embed", vectorType)));
+            float[] values = new float[length];
+            for (int i = 0; i < length; ++i) {
+                values[i] = RND.nextInt(256);
+            }
+            try (ArrowFormatWriter writer = new ArrowFormatWriter(rowType, 
1024, true)) {
+                writer.write(GenericRow.of(1, 
BinaryVector.fromPrimitiveArray(values)));
+                writer.write(GenericRow.of(2, null));
+                writer.write(GenericRow.of(3, 
BinaryVector.fromPrimitiveArray(values)));
+                writer.write(GenericRow.of(4, null));
+
+                writer.flush();
+                VectorSchemaRoot vectorSchemaRoot = 
writer.getVectorSchemaRoot();
+
+                ArrowBatchReader arrowBatchReader = new 
ArrowBatchReader(rowType, true);
+                Iterable<InternalRow> rows = 
arrowBatchReader.readBatch(vectorSchemaRoot);
+                Iterator<InternalRow> iterator = rows.iterator();
+
+                {
+                    InternalRow row = iterator.next();
+                    assertThat(row.getInt(0)).isEqualTo(1);
+                    assertThat(row.isNullAt(1)).isEqualTo(false);
+                    
assertThat(row.getVector(1).toFloatArray()).isEqualTo(values);
+                }
+                {
+                    InternalRow row = iterator.next();
+                    assertThat(row.getInt(0)).isEqualTo(2);
+                    assertThat(row.isNullAt(1)).isEqualTo(true);
+                }
+                {
+                    InternalRow row = iterator.next();
+                    assertThat(row.getInt(0)).isEqualTo(3);
+                    assertThat(row.isNullAt(1)).isEqualTo(false);
+                    
assertThat(row.getVector(1).toFloatArray()).isEqualTo(values);
+                }
+                {
+                    InternalRow row = iterator.next();
+                    assertThat(row.getInt(0)).isEqualTo(4);
+                    assertThat(row.isNullAt(1)).isEqualTo(true);
+                }
+                vectorSchemaRoot.close();
+            }
+        }
+    }
+
     @Test
     public void testWriteVariant() {
         RowType rowType = new RowType(Arrays.asList(new DataField(0, "v", 
DataTypes.VARIANT())));
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
index ad04f647b9..28221cec0d 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
@@ -146,7 +146,7 @@ public final class ColumnarArray implements InternalArray, 
DataSetters, Serializ
 
     @Override
     public InternalVector getVector(int pos) {
-        throw new UnsupportedOperationException("Unsupported type: 
VectorType");
+        return ((VecColumnVector) data).getVector(offset + pos);
     }
 
     @Override
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
 b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarVec.java
similarity index 74%
copy from 
paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
copy to 
paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarVec.java
index ad04f647b9..01512dea78 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarVec.java
@@ -20,7 +20,6 @@ package org.apache.paimon.data.columnar;
 
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.BlobData;
 import org.apache.paimon.data.DataSetters;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.InternalArray;
@@ -28,14 +27,12 @@ import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.InternalVector;
 import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.data.variant.GenericVariant;
 import org.apache.paimon.data.variant.Variant;
 
 import java.io.Serializable;
-import java.util.Arrays;
 
-/** Columnar array to support access to vector column data. */
-public final class ColumnarArray implements InternalArray, DataSetters, 
Serializable {
+/** Columnar VectorType to support access to vector column data. */
+public final class ColumnarVec implements InternalVector, DataSetters, 
Serializable {
 
     private static final long serialVersionUID = 1L;
 
@@ -43,12 +40,37 @@ public final class ColumnarArray implements InternalArray, 
DataSetters, Serializ
     private final int offset;
     private final int numElements;
 
-    public ColumnarArray(ColumnVector data, int offset, int numElements) {
+    private ColumnarVec(ColumnVector data, int offset, int numElements) {
         this.data = data;
         this.offset = offset;
         this.numElements = numElements;
     }
 
+    public static final Factory DEFAULT_FACTORY = new Factory();
+
+    /** A Factory is used to ensure that ColumnarVec does not contain null 
elements. */
+    public static class Factory {
+        public final ColumnarVec create(ColumnVector data, int offset, int 
numElements) {
+            if (offset < 0) {
+                throw new IllegalArgumentException("Offset must be 
non-negative.");
+            }
+            if (numElements <= 0) {
+                throw new IllegalArgumentException("Number of elements must be 
positive.");
+            }
+            ensureNonNull(data, offset, numElements);
+            return new ColumnarVec(data, offset, numElements);
+        }
+
+        protected void ensureNonNull(ColumnVector data, int offset, int 
numElements) {
+            final int limit = offset + numElements;
+            for (int pos = offset; pos < limit; ++pos) {
+                if (data.isNullAt(pos)) {
+                    throw new UnsupportedOperationException("ColumnarVec 
refuse null elements.");
+                }
+            }
+        }
+    }
+
     @Override
     public int size() {
         return numElements;
@@ -56,7 +78,8 @@ public final class ColumnarArray implements InternalArray, 
DataSetters, Serializ
 
     @Override
     public boolean isNullAt(int pos) {
-        return data.isNullAt(offset + pos);
+        // Elements in vector must be not null.
+        return false;
     }
 
     @Override
@@ -101,62 +124,52 @@ public final class ColumnarArray implements 
InternalArray, DataSetters, Serializ
 
     @Override
     public BinaryString getString(int pos) {
-        BytesColumnVector.Bytes byteArray = getByteArray(pos);
-        return BinaryString.fromBytes(byteArray.data, byteArray.offset, 
byteArray.len);
+        throw new UnsupportedOperationException("Not support the operation!");
     }
 
     @Override
     public Decimal getDecimal(int pos, int precision, int scale) {
-        return ((DecimalColumnVector) data).getDecimal(offset + pos, 
precision, scale);
+        throw new UnsupportedOperationException("Not support the operation!");
     }
 
     @Override
     public Timestamp getTimestamp(int pos, int precision) {
-        return ((TimestampColumnVector) data).getTimestamp(offset + pos, 
precision);
+        throw new UnsupportedOperationException("Not support the operation!");
     }
 
     @Override
     public byte[] getBinary(int pos) {
-        BytesColumnVector.Bytes byteArray = getByteArray(pos);
-        if (byteArray.len == byteArray.data.length) {
-            return byteArray.data;
-        } else {
-            return Arrays.copyOfRange(
-                    byteArray.data, byteArray.offset, byteArray.offset + 
byteArray.len);
-        }
+        throw new UnsupportedOperationException("Not support the operation!");
     }
 
     @Override
     public Variant getVariant(int pos) {
-        InternalRow row = getRow(pos, 2);
-        byte[] value = row.getBinary(0);
-        byte[] metadata = row.getBinary(1);
-        return new GenericVariant(value, metadata);
+        throw new UnsupportedOperationException("Not support the operation!");
     }
 
     @Override
     public Blob getBlob(int pos) {
-        return new BlobData(getBinary(pos));
+        throw new UnsupportedOperationException("Not support the operation!");
     }
 
     @Override
     public InternalArray getArray(int pos) {
-        return ((ArrayColumnVector) data).getArray(offset + pos);
+        throw new UnsupportedOperationException("Not support the operation!");
     }
 
     @Override
     public InternalVector getVector(int pos) {
-        throw new UnsupportedOperationException("Unsupported type: 
VectorType");
+        return ((VecColumnVector) data).getVector(offset + pos);
     }
 
     @Override
     public InternalMap getMap(int pos) {
-        return ((MapColumnVector) data).getMap(offset + pos);
+        throw new UnsupportedOperationException("Not support the operation!");
     }
 
     @Override
     public InternalRow getRow(int pos, int numFields) {
-        return ((RowColumnVector) data).getRow(offset + pos);
+        throw new UnsupportedOperationException("Not support the operation!");
     }
 
     @Override
@@ -267,13 +280,9 @@ public final class ColumnarArray implements InternalArray, 
DataSetters, Serializ
         return res;
     }
 
-    private BytesColumnVector.Bytes getByteArray(int pos) {
-        return ((BytesColumnVector) data).getBytes(offset + pos);
-    }
-
     @Override
     public boolean equals(Object o) {
         throw new UnsupportedOperationException(
-                "ColumnarArray do not support equals, please compare fields 
one by one!");
+                "ColumnarVector do not support equals, please compare fields 
one by one!");
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VecColumnVector.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VecColumnVector.java
new file mode 100644
index 0000000000..e6df065cbf
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VecColumnVector.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.columnar;
+
+import org.apache.paimon.data.InternalVector;
+
+/** Column vector for VectorType. */
+public interface VecColumnVector extends ColumnVector {
+    InternalVector getVector(int i);
+
+    ColumnVector getColumnVector();
+
+    int getVectorSize();
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
index 297d0b0f90..01c6037ca6 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
@@ -126,7 +126,7 @@ public class VectorizedColumnBatch implements Serializable {
     }
 
     public InternalVector getVector(int rowId, int colId) {
-        throw new UnsupportedOperationException("Unsupported type: 
VectorType");
+        return ((VecColumnVector) columns[colId]).getVector(rowId);
     }
 
     public InternalRow getRow(int rowId, int colId) {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java
index b7f92d06d9..99e8fd455c 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
 import org.apache.paimon.data.PartitionInfo;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.data.columnar.ArrayColumnVector;
@@ -39,6 +40,7 @@ import org.apache.paimon.data.columnar.MapColumnVector;
 import org.apache.paimon.data.columnar.RowColumnVector;
 import org.apache.paimon.data.columnar.ShortColumnVector;
 import org.apache.paimon.data.columnar.TimestampColumnVector;
+import org.apache.paimon.data.columnar.VecColumnVector;
 import org.apache.paimon.data.columnar.VectorizedColumnBatch;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
@@ -357,7 +359,28 @@ public class VectorMappingUtils {
 
         @Override
         public ColumnVector visit(VectorType vectorType) {
-            throw new UnsupportedOperationException("VectorType is not 
supported.");
+            return new VecColumnVector() {
+                @Override
+                public InternalVector getVector(int i) {
+                    return partition.getVector(index);
+                }
+
+                @Override
+                public boolean isNullAt(int i) {
+                    return partition.isNullAt(index);
+                }
+
+                @Override
+                public int getVectorSize() {
+                    return partition.getVector(index).size();
+                }
+
+                @Override
+                public ColumnVector getColumnVector() {
+                    throw new UnsupportedOperationException(
+                            "Doesn't support getting ColumnVector.");
+                }
+            };
         }
 
         @Override
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowWithVectorTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowWithVectorTest.java
new file mode 100644
index 0000000000..6a7b1727bf
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowWithVectorTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.columnar;
+
+import org.apache.paimon.data.InternalVector;
+import org.apache.paimon.data.columnar.heap.HeapFloatVector;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for vector access in {@link ColumnarRow}. */
+public class ColumnarRowWithVectorTest {
+
+    @Test
+    public void testVectorAccess() {
+        float[] values = new float[] {1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 6.0f};
+        VectorizedColumnBatch batch = makeColumnBatch(values, 2, null);
+
+        ColumnarRow row = new ColumnarRow(batch);
+        row.setRowId(0);
+        assertThat(row.getVector(0).toFloatArray()).isEqualTo(new float[] 
{1.0f, 2.0f, 3.0f});
+
+        row.setRowId(1);
+        assertThat(row.getVector(0).toFloatArray()).isEqualTo(new float[] 
{4.0f, 5.0f, 6.0f});
+    }
+
+    @Test
+    public void testVectorNullable() {
+        float[] values = new float[] {1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 6.0f};
+        boolean[] nulls = new boolean[] {true, false};
+        VectorizedColumnBatch batch = makeColumnBatch(values, 2, nulls);
+
+        ColumnarRow row = new ColumnarRow(batch);
+        row.setRowId(0);
+        assertThat(row.isNullAt(0)).isEqualTo(true);
+
+        row.setRowId(1);
+        assertThat(row.getVector(0).toFloatArray()).isEqualTo(new float[] 
{4.0f, 5.0f, 6.0f});
+    }
+
+    @Test
+    public void testInvalidVector() {
+        float[] values = new float[] {1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 6.0f};
+        VectorizedColumnBatch batch = makeColumnBatch(values, 2, null);
+        {
+            VecColumnVector vectorColumn = (VecColumnVector) batch.columns[0];
+            ((HeapFloatVector) vectorColumn.getColumnVector()).setNullAt(4);
+        }
+
+        ColumnarRow row = new ColumnarRow(batch);
+        row.setRowId(0);
+        assertThat(row.getVector(0).toFloatArray()).isEqualTo(new float[] 
{1.0f, 2.0f, 3.0f});
+
+        row.setRowId(1);
+        assertThat(row.isNullAt(0)).isEqualTo(false);
+        assertThatThrownBy(() -> row.getVector(0))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("refuse null elements");
+    }
+
+    private VectorizedColumnBatch makeColumnBatch(float[] values, int numRows, 
boolean[] nulls) {
+        assertThat(values.length % numRows).isEqualTo(0);
+        final int length = values.length / numRows;
+
+        HeapFloatVector elementVector = new HeapFloatVector(values.length);
+        for (int i = 0; i < values.length; i++) {
+            elementVector.setFloat(i, values[i]);
+        }
+
+        if (nulls != null) {
+            for (int i = 0; i < nulls.length; ++i) {
+                if (nulls[i]) {
+                    for (int j = i * length; j < (i + 1) * length; ++j) {
+                        elementVector.setNullAt(j);
+                    }
+                }
+            }
+        }
+
+        VecColumnVector vector =
+                new VecColumnVector() {
+                    @Override
+                    public InternalVector getVector(int i) {
+                        final int offset = i * length;
+                        return 
ColumnarVec.DEFAULT_FACTORY.create(elementVector, offset, length);
+                    }
+
+                    @Override
+                    public ColumnVector getColumnVector() {
+                        return elementVector;
+                    }
+
+                    @Override
+                    public int getVectorSize() {
+                        return length;
+                    }
+
+                    @Override
+                    public boolean isNullAt(int i) {
+                        return (nulls != null) && nulls[i];
+                    }
+                };
+
+        VectorizedColumnBatch batch = new VectorizedColumnBatch(new 
ColumnVector[] {vector});
+        batch.setNumRows(numRows);
+        return batch;
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarVecTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarVecTest.java
new file mode 100644
index 0000000000..9a280697f5
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarVecTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.columnar;
+
+import org.apache.paimon.data.columnar.heap.HeapBooleanVector;
+import org.apache.paimon.data.columnar.heap.HeapFloatVector;
+import org.apache.paimon.data.columnar.heap.HeapIntVector;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link ColumnarVec}. */
+public class ColumnarVecTest {
+
+    @Test
+    public void testIntVectorAccess() {
+        HeapIntVector intVector = new HeapIntVector(6);
+        intVector.setInt(0, 1);
+        intVector.setInt(1, 2);
+        intVector.setInt(2, 3);
+        intVector.setInt(3, 4);
+        intVector.setInt(4, 5);
+        intVector.setInt(5, 6);
+
+        ColumnarVec vector = ColumnarVec.DEFAULT_FACTORY.create(intVector, 1, 
3);
+
+        assertThat(vector.size()).isEqualTo(3);
+        assertThat(vector.toIntArray()).isEqualTo(new int[] {2, 3, 4});
+    }
+
+    @Test
+    public void testFloatVectorAccess() {
+        HeapFloatVector floatVector = new HeapFloatVector(5);
+        floatVector.setFloat(0, 1.0f);
+        floatVector.setFloat(1, 2.0f);
+        floatVector.setFloat(2, 3.0f);
+        floatVector.setFloat(3, 4.0f);
+        floatVector.setFloat(4, 5.0f);
+
+        ColumnarVec vector = ColumnarVec.DEFAULT_FACTORY.create(floatVector, 
2, 2);
+
+        assertThat(vector.size()).isEqualTo(2);
+        assertThat(vector.toFloatArray()).isEqualTo(new float[] {3.0f, 4.0f});
+    }
+
+    @Test
+    public void testRefuseNullElements() {
+        HeapBooleanVector floatVector = new HeapBooleanVector(5);
+        floatVector.setBoolean(0, true);
+        floatVector.setBoolean(1, false);
+        floatVector.setBoolean(2, true);
+        floatVector.setBoolean(3, false);
+        floatVector.setNullAt(4);
+
+        ColumnarVec vector = ColumnarVec.DEFAULT_FACTORY.create(floatVector, 
1, 3);
+        assertThat(vector.size()).isEqualTo(3);
+        assertThat(vector.toBooleanArray()).isEqualTo(new boolean[] {false, 
true, false});
+
+        assertThatThrownBy(() -> 
ColumnarVec.DEFAULT_FACTORY.create(floatVector, 2, 3))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("refuse null elements");
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/utils/VectorMappingUtilsTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/utils/VectorMappingUtilsTest.java
index 571a0d7189..34488c61e1 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/utils/VectorMappingUtilsTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/utils/VectorMappingUtilsTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.utils;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.BinaryVector;
 import org.apache.paimon.data.PartitionInfo;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.data.columnar.BooleanColumnVector;
@@ -33,6 +34,8 @@ import org.apache.paimon.data.columnar.IntColumnVector;
 import org.apache.paimon.data.columnar.LongColumnVector;
 import org.apache.paimon.data.columnar.ShortColumnVector;
 import org.apache.paimon.data.columnar.TimestampColumnVector;
+import org.apache.paimon.data.columnar.VecColumnVector;
+import org.apache.paimon.data.serializer.InternalVectorSerializer;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 
@@ -106,10 +109,11 @@ public class VectorMappingUtilsTest {
                                 DataTypes.DATE(),
                                 DataTypes.TIME(),
                                 DataTypes.TIMESTAMP(),
-                                DataTypes.FLOAT())
+                                DataTypes.FLOAT(),
+                                DataTypes.VECTOR(3, DataTypes.FLOAT()))
                         .build();
 
-        BinaryRow binaryRow = new BinaryRow(13);
+        BinaryRow binaryRow = new BinaryRow(14);
         BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
         binaryRowWriter.writeInt(0, 0);
         binaryRowWriter.writeByte(1, (byte) 1);
@@ -125,9 +129,14 @@ public class VectorMappingUtilsTest {
         binaryRowWriter.writeTimestamp(
                 11, Timestamp.fromEpochMillis(System.currentTimeMillis()), 10);
         binaryRowWriter.writeFloat(12, (float) 12.0);
+        float[] vectorValues = new float[] {1.0f, 2.0f, 3.0f};
+        InternalVectorSerializer vectorSerializer =
+                new InternalVectorSerializer(DataTypes.FLOAT(), 
vectorValues.length);
+        binaryRowWriter.writeVector(
+                13, BinaryVector.fromPrimitiveArray(vectorValues), 
vectorSerializer);
         binaryRowWriter.complete();
 
-        int[] map = {-1, -2, -3, -4, -5, -6, 1, -7, -8, -9, -10, -11, -12, 
-13, 0};
+        int[] map = {-1, -2, -3, -4, -5, -6, 1, -7, -8, -9, -10, -11, -12, 
-13, -14, 0};
         PartitionInfo partitionInfo = new PartitionInfo(map, rowType, 
binaryRow);
         ColumnVector[] columnVectors = new ColumnVector[1];
 
@@ -181,5 +190,10 @@ public class VectorMappingUtilsTest {
         
Assertions.assertThat(newColumnVectors[13]).isInstanceOf(FloatColumnVector.class);
         Assertions.assertThat(((FloatColumnVector) 
newColumnVectors[13]).getFloat(0))
                 .isEqualTo((float) 12.0);
+
+        
Assertions.assertThat(newColumnVectors[14]).isInstanceOf(VecColumnVector.class);
+        VecColumnVector vecColumnVector = (VecColumnVector) 
newColumnVectors[14];
+        
Assertions.assertThat(vecColumnVector.getVectorSize()).isEqualTo(vectorValues.length);
+        
Assertions.assertThat(vecColumnVector.getVector(0).toFloatArray()).isEqualTo(vectorValues);
     }
 }
diff --git 
a/paimon-lance/src/test/java/org/apache/paimon/format/lance/LanceReaderWriterTest.java
 
b/paimon-lance/src/test/java/org/apache/paimon/format/lance/LanceReaderWriterTest.java
index c6e5937d65..4541ca6eb6 100644
--- 
a/paimon-lance/src/test/java/org/apache/paimon/format/lance/LanceReaderWriterTest.java
+++ 
b/paimon-lance/src/test/java/org/apache/paimon/format/lance/LanceReaderWriterTest.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.format.lance;
 
 import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.BinaryVector;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
@@ -43,6 +44,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
@@ -93,4 +95,52 @@ public class LanceReaderWriterTest {
             }
         }
     }
+
+    @Test
+    public void testWriteAndReadVector(@TempDir java.nio.file.Path tempDir) 
throws Exception {
+        RowType rowType = RowType.of(DataTypes.INT(), DataTypes.VECTOR(3, 
DataTypes.FLOAT()));
+        Options options = new Options();
+        LanceFileFormat format =
+                new LanceFileFormatFactory()
+                        .create(new FileFormatFactory.FormatContext(options, 
1024, 1024));
+
+        FileIO fileIO = new LocalFileIO();
+        Path testFile = new Path(tempDir.resolve("test_vector_" + 
UUID.randomUUID()).toString());
+
+        float[] values1 = new float[] {1.0f, 2.0f, 3.0f};
+        float[] values2 = new float[] {4.0f, 5.0f, 6.0f};
+
+        // Write data
+        List<InternalRow> expectedRows = new ArrayList<>();
+        try (FormatWriter writer =
+                ((SupportsDirectWrite) format.createWriterFactory(rowType))
+                        .create(fileIO, testFile, "")) {
+            expectedRows.add(GenericRow.of(1, 
BinaryVector.fromPrimitiveArray(values1)));
+            writer.addElement(expectedRows.get(0));
+            expectedRows.add(GenericRow.of(2, 
BinaryVector.fromPrimitiveArray(values2)));
+            writer.addElement(expectedRows.get(1));
+        }
+
+        InternalRowSerializer internalRowSerializer = new 
InternalRowSerializer(rowType);
+        // Read data and check
+        FormatReaderFactory readerFactory = 
format.createReaderFactory(rowType, rowType, null);
+        try (RecordReader<InternalRow> reader =
+                        readerFactory.createReader(
+                                new FormatReaderContext(
+                                        fileIO, testFile, 
fileIO.getFileSize(testFile), null));
+                RecordReaderIterator<InternalRow> iterator = new 
RecordReaderIterator<>(reader)) {
+            assertNotNull(reader);
+
+            List<InternalRow> actualRows = new ArrayList<>();
+            while (iterator.hasNext()) {
+                actualRows.add(internalRowSerializer.copy(iterator.next()));
+            }
+
+            assertEquals(expectedRows.size(), actualRows.size());
+            assertEquals(expectedRows.get(0).getInt(0), 
actualRows.get(0).getInt(0));
+            assertArrayEquals(values1, 
actualRows.get(0).getVector(1).toFloatArray(), 0.0f);
+            assertEquals(expectedRows.get(1).getInt(0), 
actualRows.get(1).getInt(0));
+            assertArrayEquals(values2, 
actualRows.get(1).getVector(1).toFloatArray(), 0.0f);
+        }
+    }
 }
diff --git 
a/paimon-lance/src/test/java/org/apache/paimon/format/lance/VectorTypeWithLanceTest.java
 
b/paimon-lance/src/test/java/org/apache/paimon/format/lance/VectorTypeWithLanceTest.java
new file mode 100644
index 0000000000..6df0fc121a
--- /dev/null
+++ 
b/paimon-lance/src/test/java/org/apache/paimon/format/lance/VectorTypeWithLanceTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.lance;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.BinaryVector;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.RemoteIterator;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import com.lancedb.lance.file.LanceFileReader;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Tests for table with vector and lance file format. */
+public class VectorTypeWithLanceTest extends TableTestBase {
+
+    private final float[] testVector = randomVector();
+    private final AtomicInteger idGenerator = new AtomicInteger(0);
+    private final Set<Integer> nullVectorIds = new HashSet<>();
+
+    @BeforeEach
+    public void beforeEach() throws Catalog.DatabaseAlreadyExistException {
+        database = "default";
+        warehouse = new Path(TraceableFileIO.SCHEME + "://" + 
tempPath.toString());
+        Options options = new Options();
+        options.set(WAREHOUSE, warehouse.toUri().toString());
+        CatalogContext context = CatalogContext.create(options, new 
TraceableFileIO.Loader(), null);
+        catalog = CatalogFactory.createCatalog(context);
+        catalog.createDatabase(database, true);
+    }
+
+    @Test
+    public void testBasic() throws Exception {
+        createTableDefault();
+
+        commitDefault(writeDataDefault(100, 1));
+
+        AtomicInteger integer = new AtomicInteger(0);
+
+        readDefault(
+                row -> {
+                    integer.incrementAndGet();
+                    if (integer.get() % 10 == 0) {
+                        boolean vectorIsNull = 
nullVectorIds.contains(row.getInt(0));
+                        if (vectorIsNull) {
+                            assertThat(row.isNullAt(2)).isEqualTo(true);
+                        } else {
+                            assertThat(row.isNullAt(2)).isEqualTo(false);
+                            
assertThat(row.getVector(2).toFloatArray()).isEqualTo(testVector);
+                        }
+                    }
+                });
+
+        assertThat(integer.get()).isEqualTo(100);
+
+        FileStoreTable table = getTableDefault();
+        RemoteIterator<FileStatus> files =
+                table.fileIO().listFilesIterative(table.location(), true);
+        while (files.hasNext()) {
+            String file = files.next().getPath().toString();
+            if (file.endsWith(".lance")) {
+                checkFileByLanceReader(file);
+                return;
+            }
+        }
+        Assertions.fail("Do not find any lance file.");
+    }
+
+    @Override
+    protected Schema schemaDefault() {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("f0", DataTypes.INT());
+        schemaBuilder.column("f1", DataTypes.STRING());
+        schemaBuilder.column("f2", DataTypes.VECTOR(testVector.length, 
DataTypes.FLOAT()));
+        // schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+        schemaBuilder.option(CoreOptions.FILE_FORMAT.key(), "lance");
+        schemaBuilder.option(CoreOptions.FILE_COMPRESSION.key(), "none");
+        return schemaBuilder.build();
+    }
+
+    @Override
+    protected InternalRow dataDefault(int time, int size) {
+        boolean shouldNull = RANDOM.nextBoolean();
+        Integer id = idGenerator.getAndIncrement();
+        BinaryVector vector = shouldNull ? null : 
BinaryVector.fromPrimitiveArray(testVector);
+        if (shouldNull) {
+            nullVectorIds.add(id);
+        }
+        return GenericRow.of(id, BinaryString.fromBytes(randomBytes()), 
vector);
+    }
+
+    @Override
+    protected byte[] randomBytes() {
+        byte[] binary = new byte[RANDOM.nextInt(1024) + 1];
+        RANDOM.nextBytes(binary);
+        return binary;
+    }
+
+    private float[] randomVector() {
+        byte[] randomBytes = randomBytes();
+        float[] vector = new float[randomBytes.length];
+        for (int i = 0; i < vector.length; i++) {
+            vector[i] = randomBytes[i];
+        }
+        return vector;
+    }
+
+    private void checkFileByLanceReader(String path) throws Exception {
+        ArrowType expected = new ArrowType.FixedSizeList(testVector.length);
+        RootAllocator allocator = new RootAllocator();
+        Map<String, String> options = new HashMap<>();
+        try (LanceFileReader reader = LanceFileReader.open(path, options, 
allocator)) {
+            org.apache.arrow.vector.types.pojo.Schema schema = reader.schema();
+            org.apache.arrow.vector.types.pojo.Field field = 
schema.findField("f2");
+            Assertions.assertEquals(expected, field.getFieldType().getType());
+        }
+    }
+}

Reply via email to