This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c767c61a9a [core] add columnar impl for InternalVector and link to
arrow module (#7236)
c767c61a9a is described below
commit c767c61a9ab88a15ffbc12c116b56f9d390a8782
Author: ColdL <[email protected]>
AuthorDate: Mon Feb 9 11:20:46 2026 +0800
[core] add columnar impl for InternalVector and link to arrow module (#7236)
---
.../java/org/apache/paimon/arrow/ArrowUtils.java | 15 +-
.../converter/Arrow2PaimonVectorConverter.java | 112 +++++++++++++-
.../writer/ArrowFieldWriterFactoryVisitor.java | 10 +-
.../paimon/arrow/writer/ArrowFieldWriters.java | 90 +++++++++++
.../org/apache/paimon/arrow/ArrowUtilsTest.java | 10 ++
.../ArrowVectorizedBatchConverterTest.java | 164 +++++++++++++++++++++
.../paimon/arrow/vector/ArrowFormatWriterTest.java | 83 +++++++++++
.../apache/paimon/data/columnar/ColumnarArray.java | 2 +-
.../{ColumnarArray.java => ColumnarVec.java} | 73 +++++----
.../paimon/data/columnar/VecColumnVector.java | 30 ++++
.../data/columnar/VectorizedColumnBatch.java | 2 +-
.../apache/paimon/utils/VectorMappingUtils.java | 25 +++-
.../data/columnar/ColumnarRowWithVectorTest.java | 126 ++++++++++++++++
.../paimon/data/columnar/ColumnarVecTest.java | 81 ++++++++++
.../paimon/utils/VectorMappingUtilsTest.java | 20 ++-
.../paimon/format/lance/LanceReaderWriterTest.java | 50 +++++++
.../format/lance/VectorTypeWithLanceTest.java | 159 ++++++++++++++++++++
17 files changed, 1006 insertions(+), 46 deletions(-)
diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
index 7b387b409e..041927b7fb 100644
--- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
+++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowUtils.java
@@ -30,6 +30,7 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VariantType;
+import org.apache.paimon.types.VectorType;
import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;
@@ -137,14 +138,16 @@ public class ArrowUtils {
fieldType.getDictionary(),
Collections.singletonMap(PARQUET_FIELD_ID,
String.valueOf(fieldId)));
List<Field> children = null;
- if (dataType instanceof ArrayType) {
+ if (dataType instanceof ArrayType || dataType instanceof VectorType) {
+ final DataType elementType;
+ if (dataType instanceof VectorType) {
+ elementType = ((VectorType) dataType).getElementType();
+ } else {
+ elementType = ((ArrayType) dataType).getElementType();
+ }
Field field =
toArrowField(
- ListVector.DATA_VECTOR_NAME,
- fieldId,
- ((ArrayType) dataType).getElementType(),
- depth + 1,
- visitor);
+ ListVector.DATA_VECTOR_NAME, fieldId, elementType,
depth + 1, visitor);
FieldType typeInner = field.getFieldType();
field =
new Field(
diff --git
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java
index c78aa324ae..e1fe66883a 100644
---
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java
+++
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.columnar.ArrayColumnVector;
import org.apache.paimon.data.columnar.BooleanColumnVector;
@@ -31,6 +32,7 @@ import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.ColumnarArray;
import org.apache.paimon.data.columnar.ColumnarMap;
import org.apache.paimon.data.columnar.ColumnarRow;
+import org.apache.paimon.data.columnar.ColumnarVec;
import org.apache.paimon.data.columnar.DecimalColumnVector;
import org.apache.paimon.data.columnar.DoubleColumnVector;
import org.apache.paimon.data.columnar.FloatColumnVector;
@@ -40,6 +42,7 @@ import org.apache.paimon.data.columnar.MapColumnVector;
import org.apache.paimon.data.columnar.RowColumnVector;
import org.apache.paimon.data.columnar.ShortColumnVector;
import org.apache.paimon.data.columnar.TimestampColumnVector;
+import org.apache.paimon.data.columnar.VecColumnVector;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
@@ -68,6 +71,7 @@ import org.apache.paimon.types.VarCharType;
import org.apache.paimon.types.VariantType;
import org.apache.paimon.types.VectorType;
+import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
@@ -83,6 +87,7 @@ import org.apache.arrow.vector.TimeStampVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.complex.FixedSizeListVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.StructVector;
@@ -485,7 +490,57 @@ public interface Arrow2PaimonVectorConverter {
@Override
public Arrow2PaimonVectorConverter visit(VectorType vectorType) {
- throw new UnsupportedOperationException("Doesn't support
VectorType.");
+ final Arrow2PaimonVectorConverter arrowVectorConvertor =
+ vectorType.getElementType().accept(this);
+
+ return vector ->
+ new VecColumnVector() {
+
+ private boolean inited = false;
+ private ColumnVector columnVector;
+ private ColumnarVec.Factory factory;
+
+ private void init() {
+ if (!inited) {
+ if (!(vector instanceof FixedSizeListVector)) {
+ throw new UnsupportedOperationException(
+ "Cannot convert " +
vector.getClass() + " to vector");
+ }
+ FixedSizeListVector listVector =
(FixedSizeListVector) vector;
+ FieldVector dataVector =
listVector.getDataVector();
+ factory =
+ new Arrow2ColumnarVecFactory(
+
dataVector.getValidityBuffer());
+ this.columnVector =
arrowVectorConvertor.convertVector(dataVector);
+ inited = true;
+ }
+ }
+
+ @Override
+ public boolean isNullAt(int index) {
+ return vector.isNull(index);
+ }
+
+ @Override
+ public InternalVector getVector(int index) {
+ init();
+ FixedSizeListVector listVector =
(FixedSizeListVector) vector;
+ int start = listVector.getElementStartIndex(index);
+ int end = listVector.getElementEndIndex(index);
+ return factory.create(columnVector, start, end -
start);
+ }
+
+ @Override
+ public ColumnVector getColumnVector() {
+ init();
+ return columnVector;
+ }
+
+ @Override
+ public int getVectorSize() {
+ return vectorType.getLength();
+ }
+ };
}
@Override
@@ -595,5 +650,60 @@ public interface Arrow2PaimonVectorConverter {
}
};
}
+
+ private static final int[] VALIDITY_BYTE_MASK =
+ new int[] {0b0, 0b1, 0b11, 0b111, 0b1111, 0b11111, 0b111111,
0b1111111, 0b11111111};
+
+ private static class Arrow2ColumnarVecFactory extends
ColumnarVec.Factory {
+
+ private final ArrowBuf validityBuf;
+ private final boolean nullable;
+
+ private Arrow2ColumnarVecFactory(ArrowBuf validityBuf) {
+ this.validityBuf = validityBuf;
+ this.nullable = validityBuf != null && validityBuf.capacity()
> 0;
+ }
+
+ @Override
+ public void ensureNonNull(ColumnVector data, int offset, int
numElements) {
+ if (!nullable) {
+ return;
+ }
+
+ final int startByteIndex = offset >> 3;
+ final int startBitIndex = offset & 7;
+ final int endByteIndex = (offset + numElements - 1) >> 3;
+ final int endBitIndex = (offset + numElements - 1) & 7;
+
+ if (startByteIndex == endByteIndex) {
+ byte bits = validityBuf.getByte(startByteIndex);
+ checkValidityRange(bits, startBitIndex, endBitIndex);
+ return;
+ }
+
+ byte bits = validityBuf.getByte(startByteIndex);
+ checkValidityRange(bits, startBitIndex, 7);
+ for (int i = startByteIndex + 1; i < endByteIndex; ++i) {
+ bits = validityBuf.getByte(i);
+ checkValidityAll(bits);
+ }
+ bits = validityBuf.getByte(endByteIndex);
+ checkValidityRange(bits, 0, endBitIndex);
+ }
+
+ private void checkValidityAll(byte bits) {
+ if ((bits & 0xFF) != 0xFF) {
+ throw new UnsupportedOperationException("Vector elements
must be nonNull");
+ }
+ }
+
+ private void checkValidityRange(byte bits, int start, int end) {
+ int r = (bits & 0xFF) | VALIDITY_BYTE_MASK[start];
+ int m = VALIDITY_BYTE_MASK[end + 1];
+ if ((r & m) != m) {
+ throw new UnsupportedOperationException("Vector elements
must be nonNull");
+ }
+ }
+ }
}
}
diff --git
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java
index d07762106e..ccff6d6a24 100644
---
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java
+++
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java
@@ -44,6 +44,7 @@ import org.apache.paimon.types.VariantType;
import org.apache.paimon.types.VectorType;
import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.complex.FixedSizeListVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
@@ -168,7 +169,14 @@ public class ArrowFieldWriterFactoryVisitor implements
DataTypeVisitor<ArrowFiel
@Override
public ArrowFieldWriterFactory visit(VectorType vectorType) {
- throw new UnsupportedOperationException("Doesn't support VectorType.");
+ ArrowFieldWriterFactory elementWriterFactory =
vectorType.getElementType().accept(this);
+ return (fieldVector, isNullable) ->
+ new ArrowFieldWriters.VectorWriter(
+ fieldVector,
+ vectorType.getLength(),
+ elementWriterFactory.create(
+ ((FixedSizeListVector)
fieldVector).getDataVector(), isNullable),
+ isNullable);
}
@Override
diff --git
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java
index 409b1f9ab2..7cb64de7fd 100644
---
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java
+++
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java
@@ -25,6 +25,7 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.columnar.ArrayColumnVector;
import org.apache.paimon.data.columnar.BooleanColumnVector;
@@ -40,6 +41,7 @@ import org.apache.paimon.data.columnar.MapColumnVector;
import org.apache.paimon.data.columnar.RowColumnVector;
import org.apache.paimon.data.columnar.ShortColumnVector;
import org.apache.paimon.data.columnar.TimestampColumnVector;
+import org.apache.paimon.data.columnar.VecColumnVector;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.data.variant.GenericVariant;
import org.apache.paimon.data.variant.PaimonShreddingUtils;
@@ -65,6 +67,7 @@ import org.apache.arrow.vector.TimeStampVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.complex.FixedSizeListVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.StructVector;
@@ -711,6 +714,93 @@ public class ArrowFieldWriters {
}
}
+ /** Writer for VECTOR. */
+ public static class VectorWriter extends ArrowFieldWriter {
+
+ private final ArrowFieldWriter elementWriter;
+
+ private final int length;
+
+ public VectorWriter(
+ FieldVector fieldVector,
+ int length,
+ ArrowFieldWriter elementWriter,
+ boolean isNullable) {
+ super(fieldVector, isNullable);
+ this.length = length;
+ this.elementWriter = elementWriter;
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ elementWriter.reset();
+ }
+
+ @Override
+ protected void doWrite(
+ ColumnVector columnVector,
+ @Nullable int[] pickedInColumn,
+ int startIndex,
+ int batchRows) {
+ VecColumnVector vecColumnVector = (VecColumnVector) columnVector;
+
+ if (pickedInColumn == null) {
+ elementWriter.write(
+ vecColumnVector.getColumnVector(),
+ null,
+ startIndex * length,
+ batchRows * length);
+ } else {
+ int[] childPickedInColumn = new int[batchRows * length];
+ for (int i = 0; i < batchRows; ++i) {
+ int pickedIndexInChild = pickedInColumn[startIndex + i] *
length;
+ for (int j = 0; j < length; ++j) {
+ childPickedInColumn[i * length + j] =
pickedIndexInChild + j;
+ }
+ }
+ elementWriter.write(
+ vecColumnVector.getColumnVector(),
+ childPickedInColumn,
+ 0,
+ batchRows * length);
+ }
+
+ // set FixedSizeListVector
+ FixedSizeListVector listVector = (FixedSizeListVector) fieldVector;
+ for (int i = 0; i < batchRows; i++) {
+ int row = getRowNumber(startIndex, i, pickedInColumn);
+ if (vecColumnVector.isNullAt(row)) {
+ listVector.setNull(i);
+ } else {
+ listVector.startNewValue(i);
+ }
+ }
+ }
+
+ @Override
+ protected void doWrite(int rowIndex, DataGetters getters, int pos) {
+ InternalVector vector = getters.getVector(pos);
+ if (vector.size() != length) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The size of vector %s is not equal to the
length %s",
+ vector.size(), length));
+ }
+ FixedSizeListVector listVector = (FixedSizeListVector) fieldVector;
+ listVector.setNotNull(rowIndex);
+ final int rowBase = rowIndex * length;
+ for (int vectorIndex = 0; vectorIndex < length; ++vectorIndex) {
+ elementWriter.write(rowBase + vectorIndex, vector,
vectorIndex);
+ }
+ // Ensure child value count is large enough.
+ listVector
+ .getDataVector()
+ .setValueCount(
+
Math.max(listVector.getDataVector().getValueCount(), rowBase + length));
+ }
+ }
+
/** Writer for MAP. */
public static class MapWriter extends ArrowFieldWriter {
diff --git
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/ArrowUtilsTest.java
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/ArrowUtilsTest.java
index 49af975150..ee613a05d7 100644
--- a/paimon-arrow/src/test/java/org/apache/paimon/arrow/ArrowUtilsTest.java
+++ b/paimon-arrow/src/test/java/org/apache/paimon/arrow/ArrowUtilsTest.java
@@ -24,6 +24,7 @@ import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.assertj.core.api.Assertions;
@@ -108,4 +109,13 @@ public class ArrowUtilsTest {
.getFieldType();
Assertions.assertThat(fieldType.isNullable()).isTrue();
}
+
+ @Test
+ public void testVectorType() {
+ Field field =
+ ArrowUtils.toArrowField("embed", 0, DataTypes.VECTOR(4,
DataTypes.FLOAT()), 0);
+ Assertions.assertThat(field.getFieldType().getType())
+ .isEqualTo(new ArrowType.FixedSizeList(4));
+ Assertions.assertThat(field.getChildren()).hasSize(1);
+ }
}
diff --git
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverterTest.java
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverterTest.java
new file mode 100644
index 0000000000..ec484de64b
--- /dev/null
+++
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverterTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.arrow.converter;
+
+import org.apache.paimon.arrow.ArrowUtils;
+import org.apache.paimon.arrow.writer.ArrowFieldWriter;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
+import org.apache.paimon.data.columnar.ColumnVector;
+import org.apache.paimon.data.columnar.ColumnarVec;
+import org.apache.paimon.data.columnar.VecColumnVector;
+import org.apache.paimon.data.columnar.VectorizedColumnBatch;
+import org.apache.paimon.data.columnar.heap.HeapFloatVector;
+import org.apache.paimon.reader.VectorizedRecordIterator;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.FixedSizeListVector;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.function.IntPredicate;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ArrowVectorizedBatchConverter}. */
+public class ArrowVectorizedBatchConverterTest {
+
+ @Test
+ public void testVectorColumnWrite() {
+ RowType rowType = RowType.of(DataTypes.VECTOR(3, DataTypes.FLOAT()));
+ try (RootAllocator allocator = new RootAllocator()) {
+ VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(rowType,
allocator);
+ ArrowFieldWriter[] fieldWriters =
ArrowUtils.createArrowFieldWriters(vsr, rowType);
+
+ int length = 3;
+ int rows = 2;
+ HeapFloatVector elementVector = new HeapFloatVector(rows * length);
+ float[] values = new float[] {1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 6.0f};
+ for (int i = 0; i < values.length; i++) {
+ elementVector.setFloat(i, values[i]);
+ }
+ VecColumnVector vector =
+ new TestVecColumnVectorWithNulls(elementVector, length, i
-> i % 2 == 1);
+
+ VectorizedColumnBatch batch = new VectorizedColumnBatch(new
ColumnVector[] {vector});
+ batch.setNumRows(rows);
+
+ ArrowVectorizedBatchConverter converter =
+ new ArrowVectorizedBatchConverter(vsr, fieldWriters);
+ converter.reset(
+ new VectorizedRecordIterator() {
+ @Override
+ public VectorizedColumnBatch batch() {
+ return batch;
+ }
+
+ @Override
+ public InternalRow next() {
+ return null;
+ }
+
+ @Override
+ public void releaseBatch() {}
+ });
+ converter.next(rows);
+
+ FixedSizeListVector listVector = (FixedSizeListVector)
vsr.getVector(0);
+ assertThat(listVector.isNull(0)).isFalse();
+ assertThat(listVector.isNull(1)).isTrue();
+
+ @SuppressWarnings("unchecked")
+ List<Float> row0 = (List<Float>) listVector.getObject(0);
+ assertThat(row0).containsExactly(1.0f, 2.0f, 3.0f);
+ assertThat(listVector.getObject(1)).isNull();
+
+ converter.close();
+ }
+ }
+
+ @Test
+ public void testVectorColumnWriteWithPickedInColumn() {
+ RowType rowType = RowType.of(DataTypes.VECTOR(2, DataTypes.FLOAT()));
+ try (RootAllocator allocator = new RootAllocator()) {
+ VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(rowType,
allocator);
+ ArrowFieldWriter[] fieldWriters =
ArrowUtils.createArrowFieldWriters(vsr, rowType);
+
+ int length = 2;
+ int rows = 4;
+ HeapFloatVector elementVector = new HeapFloatVector(rows * length);
+ float[] values = new float[] {1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 6.0f,
7.0f, 8.0f};
+ for (int i = 0; i < values.length; i++) {
+ elementVector.setFloat(i, values[i]);
+ }
+
+ VecColumnVector vector = new
TestVecColumnVectorWithNulls(elementVector, length, null);
+
+ int[] pickedInColumn = new int[] {2, 0};
+ fieldWriters[0].reset();
+ fieldWriters[0].write(vector, pickedInColumn, 0,
pickedInColumn.length);
+
+ FixedSizeListVector listVector = (FixedSizeListVector)
vsr.getVector(0);
+ @SuppressWarnings("unchecked")
+ List<Float> row0 = (List<Float>) listVector.getObject(0);
+ assertThat(row0).containsExactly(5.0f, 6.0f);
+ @SuppressWarnings("unchecked")
+ List<Float> row1 = (List<Float>) listVector.getObject(1);
+ assertThat(row1).containsExactly(1.0f, 2.0f);
+
+ vsr.close();
+ }
+ }
+
+ private static class TestVecColumnVectorWithNulls implements
VecColumnVector {
+
+ private final ColumnVector data;
+ private final int length;
+ private final IntPredicate nulls;
+
+ private TestVecColumnVectorWithNulls(ColumnVector data, int length,
IntPredicate nulls) {
+ this.data = data;
+ this.length = length;
+ this.nulls = nulls;
+ }
+
+ @Override
+ public InternalVector getVector(int i) {
+ return ColumnarVec.DEFAULT_FACTORY.create(data, i * length,
length);
+ }
+
+ @Override
+ public ColumnVector getColumnVector() {
+ return data;
+ }
+
+ @Override
+ public int getVectorSize() {
+ return length;
+ }
+
+ @Override
+ public boolean isNullAt(int i) {
+ return (nulls != null) && nulls.test(i);
+ }
+ }
+}
diff --git
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
index ec1fe10c46..9b0333f376 100644
---
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
+++
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
@@ -24,6 +24,7 @@ import
org.apache.paimon.arrow.converter.Arrow2PaimonVectorConverter;
import org.apache.paimon.arrow.reader.ArrowBatchReader;
import org.apache.paimon.arrow.writer.ArrowFieldWriterFactoryVisitor;
import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.BinaryVector;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericMap;
@@ -36,6 +37,7 @@ import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VectorType;
import org.apache.paimon.utils.StringUtils;
import org.apache.arrow.memory.BufferAllocator;
@@ -140,6 +142,87 @@ public class ArrowFormatWriterTest {
}
}
+ @Test
+ public void testWriteVector() {
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "embed", DataTypes.VECTOR(3,
DataTypes.FLOAT()))));
+ float[] values = new float[] {1.0f, 2.0f, 3.0f};
+ try (ArrowFormatWriter writer = new ArrowFormatWriter(rowType, 16,
true)) {
+ writer.write(GenericRow.of(1,
BinaryVector.fromPrimitiveArray(values)));
+
+ writer.flush();
+ VectorSchemaRoot vectorSchemaRoot = writer.getVectorSchemaRoot();
+
+ ArrowBatchReader arrowBatchReader = new ArrowBatchReader(rowType,
true);
+ Iterable<InternalRow> rows =
arrowBatchReader.readBatch(vectorSchemaRoot);
+
+ Iterator<InternalRow> iterator = rows.iterator();
+ InternalRow row = iterator.next();
+
+ assertThat(row.getInt(0)).isEqualTo(1);
+ assertThat(row.getVector(1).toFloatArray()).isEqualTo(values);
+ vectorSchemaRoot.close();
+ }
+ }
+
+ @Test
+ public void testWriteVectorWithNulls() {
+ // Arrow2ColumnarVecFactory needs to handle bit-level checks on the
validity buffer.
+ // Different lengths can cover validation across multiple bytes.
+ for (int length = 1; length <= 18; ++length) {
+ VectorType vectorType = DataTypes.VECTOR(length,
DataTypes.FLOAT());
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "embed", vectorType)));
+ float[] values = new float[length];
+ for (int i = 0; i < length; ++i) {
+ values[i] = RND.nextInt(256);
+ }
+ try (ArrowFormatWriter writer = new ArrowFormatWriter(rowType,
1024, true)) {
+ writer.write(GenericRow.of(1,
BinaryVector.fromPrimitiveArray(values)));
+ writer.write(GenericRow.of(2, null));
+ writer.write(GenericRow.of(3,
BinaryVector.fromPrimitiveArray(values)));
+ writer.write(GenericRow.of(4, null));
+
+ writer.flush();
+ VectorSchemaRoot vectorSchemaRoot =
writer.getVectorSchemaRoot();
+
+ ArrowBatchReader arrowBatchReader = new
ArrowBatchReader(rowType, true);
+ Iterable<InternalRow> rows =
arrowBatchReader.readBatch(vectorSchemaRoot);
+ Iterator<InternalRow> iterator = rows.iterator();
+
+ {
+ InternalRow row = iterator.next();
+ assertThat(row.getInt(0)).isEqualTo(1);
+ assertThat(row.isNullAt(1)).isEqualTo(false);
+
assertThat(row.getVector(1).toFloatArray()).isEqualTo(values);
+ }
+ {
+ InternalRow row = iterator.next();
+ assertThat(row.getInt(0)).isEqualTo(2);
+ assertThat(row.isNullAt(1)).isEqualTo(true);
+ }
+ {
+ InternalRow row = iterator.next();
+ assertThat(row.getInt(0)).isEqualTo(3);
+ assertThat(row.isNullAt(1)).isEqualTo(false);
+
assertThat(row.getVector(1).toFloatArray()).isEqualTo(values);
+ }
+ {
+ InternalRow row = iterator.next();
+ assertThat(row.getInt(0)).isEqualTo(4);
+ assertThat(row.isNullAt(1)).isEqualTo(true);
+ }
+ vectorSchemaRoot.close();
+ }
+ }
+ }
+
@Test
public void testWriteVariant() {
RowType rowType = new RowType(Arrays.asList(new DataField(0, "v",
DataTypes.VARIANT())));
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
index ad04f647b9..28221cec0d 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
@@ -146,7 +146,7 @@ public final class ColumnarArray implements InternalArray,
DataSetters, Serializ
@Override
public InternalVector getVector(int pos) {
- throw new UnsupportedOperationException("Unsupported type:
VectorType");
+ return ((VecColumnVector) data).getVector(offset + pos);
}
@Override
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarVec.java
similarity index 74%
copy from
paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
copy to
paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarVec.java
index ad04f647b9..01512dea78 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarVec.java
@@ -20,7 +20,6 @@ package org.apache.paimon.data.columnar;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.BlobData;
import org.apache.paimon.data.DataSetters;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
@@ -28,14 +27,12 @@ import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.InternalVector;
import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.data.variant.GenericVariant;
import org.apache.paimon.data.variant.Variant;
import java.io.Serializable;
-import java.util.Arrays;
-/** Columnar array to support access to vector column data. */
-public final class ColumnarArray implements InternalArray, DataSetters,
Serializable {
+/** Columnar VectorType to support access to vector column data. */
+public final class ColumnarVec implements InternalVector, DataSetters,
Serializable {
private static final long serialVersionUID = 1L;
@@ -43,12 +40,37 @@ public final class ColumnarArray implements InternalArray,
DataSetters, Serializ
private final int offset;
private final int numElements;
- public ColumnarArray(ColumnVector data, int offset, int numElements) {
+ private ColumnarVec(ColumnVector data, int offset, int numElements) {
this.data = data;
this.offset = offset;
this.numElements = numElements;
}
+ public static final Factory DEFAULT_FACTORY = new Factory();
+
+ /** A Factory is used to ensure that ColumnarVec does not contain null
elements. */
+ public static class Factory {
+ public final ColumnarVec create(ColumnVector data, int offset, int
numElements) {
+ if (offset < 0) {
+ throw new IllegalArgumentException("Offset must be
non-negative.");
+ }
+ if (numElements <= 0) {
+ throw new IllegalArgumentException("Number of elements must be
positive.");
+ }
+ ensureNonNull(data, offset, numElements);
+ return new ColumnarVec(data, offset, numElements);
+ }
+
+ protected void ensureNonNull(ColumnVector data, int offset, int
numElements) {
+ final int limit = offset + numElements;
+ for (int pos = offset; pos < limit; ++pos) {
+ if (data.isNullAt(pos)) {
+ throw new UnsupportedOperationException("ColumnarVec
refuse null elements.");
+ }
+ }
+ }
+ }
+
@Override
public int size() {
return numElements;
@@ -56,7 +78,8 @@ public final class ColumnarArray implements InternalArray,
DataSetters, Serializ
@Override
public boolean isNullAt(int pos) {
- return data.isNullAt(offset + pos);
+ // Elements in vector must be not null.
+ return false;
}
@Override
@@ -101,62 +124,52 @@ public final class ColumnarArray implements
InternalArray, DataSetters, Serializ
@Override
public BinaryString getString(int pos) {
- BytesColumnVector.Bytes byteArray = getByteArray(pos);
- return BinaryString.fromBytes(byteArray.data, byteArray.offset,
byteArray.len);
+ throw new UnsupportedOperationException("Not support the operation!");
}
@Override
public Decimal getDecimal(int pos, int precision, int scale) {
- return ((DecimalColumnVector) data).getDecimal(offset + pos,
precision, scale);
+ throw new UnsupportedOperationException("Not support the operation!");
}
@Override
public Timestamp getTimestamp(int pos, int precision) {
- return ((TimestampColumnVector) data).getTimestamp(offset + pos,
precision);
+ throw new UnsupportedOperationException("Not support the operation!");
}
@Override
public byte[] getBinary(int pos) {
- BytesColumnVector.Bytes byteArray = getByteArray(pos);
- if (byteArray.len == byteArray.data.length) {
- return byteArray.data;
- } else {
- return Arrays.copyOfRange(
- byteArray.data, byteArray.offset, byteArray.offset +
byteArray.len);
- }
+ throw new UnsupportedOperationException("Not support the operation!");
}
@Override
public Variant getVariant(int pos) {
- InternalRow row = getRow(pos, 2);
- byte[] value = row.getBinary(0);
- byte[] metadata = row.getBinary(1);
- return new GenericVariant(value, metadata);
+ throw new UnsupportedOperationException("Not support the operation!");
}
@Override
public Blob getBlob(int pos) {
- return new BlobData(getBinary(pos));
+ throw new UnsupportedOperationException("Not support the operation!");
}
@Override
public InternalArray getArray(int pos) {
- return ((ArrayColumnVector) data).getArray(offset + pos);
+ throw new UnsupportedOperationException("Not support the operation!");
}
@Override
public InternalVector getVector(int pos) {
- throw new UnsupportedOperationException("Unsupported type:
VectorType");
+ return ((VecColumnVector) data).getVector(offset + pos);
}
@Override
public InternalMap getMap(int pos) {
- return ((MapColumnVector) data).getMap(offset + pos);
+ throw new UnsupportedOperationException("Not support the operation!");
}
@Override
public InternalRow getRow(int pos, int numFields) {
- return ((RowColumnVector) data).getRow(offset + pos);
+ throw new UnsupportedOperationException("Not support the operation!");
}
@Override
@@ -267,13 +280,9 @@ public final class ColumnarArray implements InternalArray,
DataSetters, Serializ
return res;
}
- private BytesColumnVector.Bytes getByteArray(int pos) {
- return ((BytesColumnVector) data).getBytes(offset + pos);
- }
-
@Override
public boolean equals(Object o) {
throw new UnsupportedOperationException(
- "ColumnarArray do not support equals, please compare fields
one by one!");
+ "ColumnarVector do not support equals, please compare fields
one by one!");
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VecColumnVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VecColumnVector.java
new file mode 100644
index 0000000000..e6df065cbf
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VecColumnVector.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.columnar;
+
+import org.apache.paimon.data.InternalVector;
+
+/** Column vector for VectorType. */
+public interface VecColumnVector extends ColumnVector {
+ InternalVector getVector(int i);
+
+ ColumnVector getColumnVector();
+
+ int getVectorSize();
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
index 297d0b0f90..01c6037ca6 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
@@ -126,7 +126,7 @@ public class VectorizedColumnBatch implements Serializable {
}
public InternalVector getVector(int rowId, int colId) {
- throw new UnsupportedOperationException("Unsupported type:
VectorType");
+ return ((VecColumnVector) columns[colId]).getVector(rowId);
}
public InternalRow getRow(int rowId, int colId) {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java
index b7f92d06d9..99e8fd455c 100644
---
a/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java
+++
b/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.columnar.ArrayColumnVector;
@@ -39,6 +40,7 @@ import org.apache.paimon.data.columnar.MapColumnVector;
import org.apache.paimon.data.columnar.RowColumnVector;
import org.apache.paimon.data.columnar.ShortColumnVector;
import org.apache.paimon.data.columnar.TimestampColumnVector;
+import org.apache.paimon.data.columnar.VecColumnVector;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
@@ -357,7 +359,28 @@ public class VectorMappingUtils {
@Override
public ColumnVector visit(VectorType vectorType) {
- throw new UnsupportedOperationException("VectorType is not
supported.");
+ return new VecColumnVector() {
+ @Override
+ public InternalVector getVector(int i) {
+ return partition.getVector(index);
+ }
+
+ @Override
+ public boolean isNullAt(int i) {
+ return partition.isNullAt(index);
+ }
+
+ @Override
+ public int getVectorSize() {
+ return partition.getVector(index).size();
+ }
+
+ @Override
+ public ColumnVector getColumnVector() {
+ throw new UnsupportedOperationException(
+ "Doesn't support getting ColumnVector.");
+ }
+ };
}
@Override
diff --git
a/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowWithVectorTest.java
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowWithVectorTest.java
new file mode 100644
index 0000000000..6a7b1727bf
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowWithVectorTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.columnar;
+
+import org.apache.paimon.data.InternalVector;
+import org.apache.paimon.data.columnar.heap.HeapFloatVector;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for vector access in {@link ColumnarRow}. */
+public class ColumnarRowWithVectorTest {
+
+ @Test
+ public void testVectorAccess() {
+ float[] values = new float[] {1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 6.0f};
+ VectorizedColumnBatch batch = makeColumnBatch(values, 2, null);
+
+ ColumnarRow row = new ColumnarRow(batch);
+ row.setRowId(0);
+ assertThat(row.getVector(0).toFloatArray()).isEqualTo(new float[]
{1.0f, 2.0f, 3.0f});
+
+ row.setRowId(1);
+ assertThat(row.getVector(0).toFloatArray()).isEqualTo(new float[]
{4.0f, 5.0f, 6.0f});
+ }
+
+ @Test
+ public void testVectorNullable() {
+ float[] values = new float[] {1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 6.0f};
+ boolean[] nulls = new boolean[] {true, false};
+ VectorizedColumnBatch batch = makeColumnBatch(values, 2, nulls);
+
+ ColumnarRow row = new ColumnarRow(batch);
+ row.setRowId(0);
+ assertThat(row.isNullAt(0)).isEqualTo(true);
+
+ row.setRowId(1);
+ assertThat(row.getVector(0).toFloatArray()).isEqualTo(new float[]
{4.0f, 5.0f, 6.0f});
+ }
+
+ @Test
+ public void testInvalidVector() {
+ float[] values = new float[] {1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 6.0f};
+ VectorizedColumnBatch batch = makeColumnBatch(values, 2, null);
+ {
+ VecColumnVector vectorColumn = (VecColumnVector) batch.columns[0];
+ ((HeapFloatVector) vectorColumn.getColumnVector()).setNullAt(4);
+ }
+
+ ColumnarRow row = new ColumnarRow(batch);
+ row.setRowId(0);
+ assertThat(row.getVector(0).toFloatArray()).isEqualTo(new float[]
{1.0f, 2.0f, 3.0f});
+
+ row.setRowId(1);
+ assertThat(row.isNullAt(0)).isEqualTo(false);
+ assertThatThrownBy(() -> row.getVector(0))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("refuse null elements");
+ }
+
+ private VectorizedColumnBatch makeColumnBatch(float[] values, int numRows,
boolean[] nulls) {
+ assertThat(values.length % numRows).isEqualTo(0);
+ final int length = values.length / numRows;
+
+ HeapFloatVector elementVector = new HeapFloatVector(values.length);
+ for (int i = 0; i < values.length; i++) {
+ elementVector.setFloat(i, values[i]);
+ }
+
+ if (nulls != null) {
+ for (int i = 0; i < nulls.length; ++i) {
+ if (nulls[i]) {
+ for (int j = i * length; j < (i + 1) * length; ++j) {
+ elementVector.setNullAt(j);
+ }
+ }
+ }
+ }
+
+ VecColumnVector vector =
+ new VecColumnVector() {
+ @Override
+ public InternalVector getVector(int i) {
+ final int offset = i * length;
+ return
ColumnarVec.DEFAULT_FACTORY.create(elementVector, offset, length);
+ }
+
+ @Override
+ public ColumnVector getColumnVector() {
+ return elementVector;
+ }
+
+ @Override
+ public int getVectorSize() {
+ return length;
+ }
+
+ @Override
+ public boolean isNullAt(int i) {
+ return (nulls != null) && nulls[i];
+ }
+ };
+
+ VectorizedColumnBatch batch = new VectorizedColumnBatch(new
ColumnVector[] {vector});
+ batch.setNumRows(numRows);
+ return batch;
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarVecTest.java
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarVecTest.java
new file mode 100644
index 0000000000..9a280697f5
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarVecTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.columnar;
+
+import org.apache.paimon.data.columnar.heap.HeapBooleanVector;
+import org.apache.paimon.data.columnar.heap.HeapFloatVector;
+import org.apache.paimon.data.columnar.heap.HeapIntVector;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link ColumnarVec}. */
+public class ColumnarVecTest {
+
+ @Test
+ public void testIntVectorAccess() {
+ HeapIntVector intVector = new HeapIntVector(6);
+ intVector.setInt(0, 1);
+ intVector.setInt(1, 2);
+ intVector.setInt(2, 3);
+ intVector.setInt(3, 4);
+ intVector.setInt(4, 5);
+ intVector.setInt(5, 6);
+
+ ColumnarVec vector = ColumnarVec.DEFAULT_FACTORY.create(intVector, 1,
3);
+
+ assertThat(vector.size()).isEqualTo(3);
+ assertThat(vector.toIntArray()).isEqualTo(new int[] {2, 3, 4});
+ }
+
+ @Test
+ public void testFloatVectorAccess() {
+ HeapFloatVector floatVector = new HeapFloatVector(5);
+ floatVector.setFloat(0, 1.0f);
+ floatVector.setFloat(1, 2.0f);
+ floatVector.setFloat(2, 3.0f);
+ floatVector.setFloat(3, 4.0f);
+ floatVector.setFloat(4, 5.0f);
+
+ ColumnarVec vector = ColumnarVec.DEFAULT_FACTORY.create(floatVector,
2, 2);
+
+ assertThat(vector.size()).isEqualTo(2);
+ assertThat(vector.toFloatArray()).isEqualTo(new float[] {3.0f, 4.0f});
+ }
+
+ @Test
+ public void testRefuseNullElements() {
+ HeapBooleanVector floatVector = new HeapBooleanVector(5);
+ floatVector.setBoolean(0, true);
+ floatVector.setBoolean(1, false);
+ floatVector.setBoolean(2, true);
+ floatVector.setBoolean(3, false);
+ floatVector.setNullAt(4);
+
+ ColumnarVec vector = ColumnarVec.DEFAULT_FACTORY.create(floatVector,
1, 3);
+ assertThat(vector.size()).isEqualTo(3);
+ assertThat(vector.toBooleanArray()).isEqualTo(new boolean[] {false,
true, false});
+
+ assertThatThrownBy(() ->
ColumnarVec.DEFAULT_FACTORY.create(floatVector, 2, 3))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("refuse null elements");
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/utils/VectorMappingUtilsTest.java
b/paimon-common/src/test/java/org/apache/paimon/utils/VectorMappingUtilsTest.java
index 571a0d7189..34488c61e1 100644
---
a/paimon-common/src/test/java/org/apache/paimon/utils/VectorMappingUtilsTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/utils/VectorMappingUtilsTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.utils;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.BinaryVector;
import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.columnar.BooleanColumnVector;
@@ -33,6 +34,8 @@ import org.apache.paimon.data.columnar.IntColumnVector;
import org.apache.paimon.data.columnar.LongColumnVector;
import org.apache.paimon.data.columnar.ShortColumnVector;
import org.apache.paimon.data.columnar.TimestampColumnVector;
+import org.apache.paimon.data.columnar.VecColumnVector;
+import org.apache.paimon.data.serializer.InternalVectorSerializer;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -106,10 +109,11 @@ public class VectorMappingUtilsTest {
DataTypes.DATE(),
DataTypes.TIME(),
DataTypes.TIMESTAMP(),
- DataTypes.FLOAT())
+ DataTypes.FLOAT(),
+ DataTypes.VECTOR(3, DataTypes.FLOAT()))
.build();
- BinaryRow binaryRow = new BinaryRow(13);
+ BinaryRow binaryRow = new BinaryRow(14);
BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
binaryRowWriter.writeInt(0, 0);
binaryRowWriter.writeByte(1, (byte) 1);
@@ -125,9 +129,14 @@ public class VectorMappingUtilsTest {
binaryRowWriter.writeTimestamp(
11, Timestamp.fromEpochMillis(System.currentTimeMillis()), 10);
binaryRowWriter.writeFloat(12, (float) 12.0);
+ float[] vectorValues = new float[] {1.0f, 2.0f, 3.0f};
+ InternalVectorSerializer vectorSerializer =
+ new InternalVectorSerializer(DataTypes.FLOAT(),
vectorValues.length);
+ binaryRowWriter.writeVector(
+ 13, BinaryVector.fromPrimitiveArray(vectorValues),
vectorSerializer);
binaryRowWriter.complete();
- int[] map = {-1, -2, -3, -4, -5, -6, 1, -7, -8, -9, -10, -11, -12,
-13, 0};
+ int[] map = {-1, -2, -3, -4, -5, -6, 1, -7, -8, -9, -10, -11, -12,
-13, -14, 0};
PartitionInfo partitionInfo = new PartitionInfo(map, rowType,
binaryRow);
ColumnVector[] columnVectors = new ColumnVector[1];
@@ -181,5 +190,10 @@ public class VectorMappingUtilsTest {
Assertions.assertThat(newColumnVectors[13]).isInstanceOf(FloatColumnVector.class);
Assertions.assertThat(((FloatColumnVector)
newColumnVectors[13]).getFloat(0))
.isEqualTo((float) 12.0);
+
+
Assertions.assertThat(newColumnVectors[14]).isInstanceOf(VecColumnVector.class);
+ VecColumnVector vecColumnVector = (VecColumnVector)
newColumnVectors[14];
+
Assertions.assertThat(vecColumnVector.getVectorSize()).isEqualTo(vectorValues.length);
+
Assertions.assertThat(vecColumnVector.getVector(0).toFloatArray()).isEqualTo(vectorValues);
}
}
diff --git
a/paimon-lance/src/test/java/org/apache/paimon/format/lance/LanceReaderWriterTest.java
b/paimon-lance/src/test/java/org/apache/paimon/format/lance/LanceReaderWriterTest.java
index c6e5937d65..4541ca6eb6 100644
---
a/paimon-lance/src/test/java/org/apache/paimon/format/lance/LanceReaderWriterTest.java
+++
b/paimon-lance/src/test/java/org/apache/paimon/format/lance/LanceReaderWriterTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.format.lance;
import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.BinaryVector;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
@@ -43,6 +44,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -93,4 +95,52 @@ public class LanceReaderWriterTest {
}
}
}
+
+ @Test
+ public void testWriteAndReadVector(@TempDir java.nio.file.Path tempDir)
throws Exception {
+ RowType rowType = RowType.of(DataTypes.INT(), DataTypes.VECTOR(3,
DataTypes.FLOAT()));
+ Options options = new Options();
+ LanceFileFormat format =
+ new LanceFileFormatFactory()
+ .create(new FileFormatFactory.FormatContext(options,
1024, 1024));
+
+ FileIO fileIO = new LocalFileIO();
+ Path testFile = new Path(tempDir.resolve("test_vector_" +
UUID.randomUUID()).toString());
+
+ float[] values1 = new float[] {1.0f, 2.0f, 3.0f};
+ float[] values2 = new float[] {4.0f, 5.0f, 6.0f};
+
+ // Write data
+ List<InternalRow> expectedRows = new ArrayList<>();
+ try (FormatWriter writer =
+ ((SupportsDirectWrite) format.createWriterFactory(rowType))
+ .create(fileIO, testFile, "")) {
+ expectedRows.add(GenericRow.of(1,
BinaryVector.fromPrimitiveArray(values1)));
+ writer.addElement(expectedRows.get(0));
+ expectedRows.add(GenericRow.of(2,
BinaryVector.fromPrimitiveArray(values2)));
+ writer.addElement(expectedRows.get(1));
+ }
+
+ InternalRowSerializer internalRowSerializer = new
InternalRowSerializer(rowType);
+ // Read data and check
+ FormatReaderFactory readerFactory =
format.createReaderFactory(rowType, rowType, null);
+ try (RecordReader<InternalRow> reader =
+ readerFactory.createReader(
+ new FormatReaderContext(
+ fileIO, testFile,
fileIO.getFileSize(testFile), null));
+ RecordReaderIterator<InternalRow> iterator = new
RecordReaderIterator<>(reader)) {
+ assertNotNull(reader);
+
+ List<InternalRow> actualRows = new ArrayList<>();
+ while (iterator.hasNext()) {
+ actualRows.add(internalRowSerializer.copy(iterator.next()));
+ }
+
+ assertEquals(expectedRows.size(), actualRows.size());
+ assertEquals(expectedRows.get(0).getInt(0),
actualRows.get(0).getInt(0));
+ assertArrayEquals(values1,
actualRows.get(0).getVector(1).toFloatArray(), 0.0f);
+ assertEquals(expectedRows.get(1).getInt(0),
actualRows.get(1).getInt(0));
+ assertArrayEquals(values2,
actualRows.get(1).getVector(1).toFloatArray(), 0.0f);
+ }
+ }
}
diff --git
a/paimon-lance/src/test/java/org/apache/paimon/format/lance/VectorTypeWithLanceTest.java
b/paimon-lance/src/test/java/org/apache/paimon/format/lance/VectorTypeWithLanceTest.java
new file mode 100644
index 0000000000..6df0fc121a
--- /dev/null
+++
b/paimon-lance/src/test/java/org/apache/paimon/format/lance/VectorTypeWithLanceTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.lance;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.BinaryVector;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.RemoteIterator;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import com.lancedb.lance.file.LanceFileReader;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Tests for table with vector and lance file format. */
+public class VectorTypeWithLanceTest extends TableTestBase {
+
+ private final float[] testVector = randomVector();
+ private final AtomicInteger idGenerator = new AtomicInteger(0);
+ private final Set<Integer> nullVectorIds = new HashSet<>();
+
+ @BeforeEach
+ public void beforeEach() throws Catalog.DatabaseAlreadyExistException {
+ database = "default";
+ warehouse = new Path(TraceableFileIO.SCHEME + "://" +
tempPath.toString());
+ Options options = new Options();
+ options.set(WAREHOUSE, warehouse.toUri().toString());
+ CatalogContext context = CatalogContext.create(options, new
TraceableFileIO.Loader(), null);
+ catalog = CatalogFactory.createCatalog(context);
+ catalog.createDatabase(database, true);
+ }
+
+ @Test
+ public void testBasic() throws Exception {
+ createTableDefault();
+
+ commitDefault(writeDataDefault(100, 1));
+
+ AtomicInteger integer = new AtomicInteger(0);
+
+ readDefault(
+ row -> {
+ integer.incrementAndGet();
+ if (integer.get() % 10 == 0) {
+ boolean vectorIsNull =
nullVectorIds.contains(row.getInt(0));
+ if (vectorIsNull) {
+ assertThat(row.isNullAt(2)).isEqualTo(true);
+ } else {
+ assertThat(row.isNullAt(2)).isEqualTo(false);
+
assertThat(row.getVector(2).toFloatArray()).isEqualTo(testVector);
+ }
+ }
+ });
+
+ assertThat(integer.get()).isEqualTo(100);
+
+ FileStoreTable table = getTableDefault();
+ RemoteIterator<FileStatus> files =
+ table.fileIO().listFilesIterative(table.location(), true);
+ while (files.hasNext()) {
+ String file = files.next().getPath().toString();
+ if (file.endsWith(".lance")) {
+ checkFileByLanceReader(file);
+ return;
+ }
+ }
+ Assertions.fail("Do not find any lance file.");
+ }
+
+ @Override
+ protected Schema schemaDefault() {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.VECTOR(testVector.length,
DataTypes.FLOAT()));
+ // schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+ schemaBuilder.option(CoreOptions.FILE_FORMAT.key(), "lance");
+ schemaBuilder.option(CoreOptions.FILE_COMPRESSION.key(), "none");
+ return schemaBuilder.build();
+ }
+
+ @Override
+ protected InternalRow dataDefault(int time, int size) {
+ boolean shouldNull = RANDOM.nextBoolean();
+ Integer id = idGenerator.getAndIncrement();
+ BinaryVector vector = shouldNull ? null :
BinaryVector.fromPrimitiveArray(testVector);
+ if (shouldNull) {
+ nullVectorIds.add(id);
+ }
+ return GenericRow.of(id, BinaryString.fromBytes(randomBytes()),
vector);
+ }
+
+ @Override
+ protected byte[] randomBytes() {
+ byte[] binary = new byte[RANDOM.nextInt(1024) + 1];
+ RANDOM.nextBytes(binary);
+ return binary;
+ }
+
+ private float[] randomVector() {
+ byte[] randomBytes = randomBytes();
+ float[] vector = new float[randomBytes.length];
+ for (int i = 0; i < vector.length; i++) {
+ vector[i] = randomBytes[i];
+ }
+ return vector;
+ }
+
+ private void checkFileByLanceReader(String path) throws Exception {
+ ArrowType expected = new ArrowType.FixedSizeList(testVector.length);
+ RootAllocator allocator = new RootAllocator();
+ Map<String, String> options = new HashMap<>();
+ try (LanceFileReader reader = LanceFileReader.open(path, options,
allocator)) {
+ org.apache.arrow.vector.types.pojo.Schema schema = reader.schema();
+ org.apache.arrow.vector.types.pojo.Field field =
schema.findField("f2");
+ Assertions.assertEquals(expected, field.getFieldType().getType());
+ }
+ }
+}