This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9708d745fb [flink] add flink support for VectorType (#7238)
9708d745fb is described below
commit 9708d745fb1dfc5318c61ca6737ce0f1f44b0952
Author: ColdL <[email protected]>
AuthorDate: Mon Feb 9 15:21:30 2026 +0800
[flink] add flink support for VectorType (#7238)
---
.../apache/paimon/flink/DataTypeToLogicalType.java | 3 +-
.../java/org/apache/paimon/flink/FlinkCatalog.java | 25 ++-
.../org/apache/paimon/flink/FlinkRowWrapper.java | 10 +-
.../apache/paimon/flink/LogicalTypeConversion.java | 15 ++
.../paimon/flink/FlinkRowDataWithVectorTest.java | 51 ++++++
.../flink/FlinkRowWrapperWithVectorTest.java | 48 ++++++
.../paimon/flink/LogicalTypeConversionTest.java | 59 +++++++
.../apache/paimon/flink/VectorTypeTableITCase.java | 190 +++++++++++++++++++++
.../flink/utils/InternalRowSerializerTest.java | 25 +++
9 files changed, 420 insertions(+), 6 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java
index 6fc3016d4f..16a270587d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java
@@ -162,7 +162,8 @@ public class DataTypeToLogicalType implements
DataTypeVisitor<LogicalType> {
@Override
public LogicalType visit(VectorType vectorType) {
- throw new UnsupportedOperationException("Not support VectorType yet.");
+ return new org.apache.flink.table.types.logical.ArrayType(
+ vectorType.isNullable(),
vectorType.getElementType().accept(this));
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index de0d4e3f26..c0eddff646 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -155,6 +155,7 @@ import static
org.apache.paimon.flink.FlinkCatalogOptions.DISABLE_CREATE_TABLE_I
import static org.apache.paimon.flink.LogicalTypeConversion.toBlobType;
import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
+import static org.apache.paimon.flink.LogicalTypeConversion.toVectorType;
import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.SCHEMA;
import static
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey;
import static
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn;
@@ -1040,14 +1041,32 @@ public class FlinkCatalog extends AbstractCatalog {
field ->
schemaBuilder.column(
field.getName(),
- blobFields.contains(field.getName())
- ? toBlobType(field.getType())
- : toDataType(field.getType()),
+ resolveDataType(field.getName(),
field.getType(), options),
columnComments.get(field.getName())));
return schemaBuilder.build();
}
+ private static org.apache.paimon.types.DataType resolveDataType(
+ String fieldName,
+ org.apache.flink.table.types.logical.LogicalType logicalType,
+ Map<String, String> options) {
+ List<String> blobFields = CoreOptions.blobField(options);
+ if (blobFields.contains(fieldName)) {
+ return toBlobType(logicalType);
+ }
+ if (logicalType instanceof
org.apache.flink.table.types.logical.ArrayType) {
+ String vectorDim =
options.get(String.format("field.%s.vector-dim", fieldName));
+ if (vectorDim != null) {
+ org.apache.flink.table.types.logical.LogicalType elementType =
+ ((org.apache.flink.table.types.logical.ArrayType)
logicalType)
+ .getElementType();
+ return toVectorType(elementType, vectorDim);
+ }
+ }
+ return toDataType(logicalType);
+ }
+
private static Map<String, String> getColumnComments(CatalogBaseTable
catalogTable) {
return catalogTable.getUnresolvedSchema().getColumns().stream()
.filter(c -> c.getComment().isPresent())
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
index ae8a4fc592..3201f594fe 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
@@ -163,7 +163,7 @@ public class FlinkRowWrapper implements InternalRow {
@Override
public InternalVector getVector(int pos) {
- throw new UnsupportedOperationException("Not support VectorType yet.");
+ return new FlinkVectorWrapper(row.getArray(pos));
}
@Override
@@ -268,7 +268,7 @@ public class FlinkRowWrapper implements InternalRow {
@Override
public InternalVector getVector(int pos) {
- throw new UnsupportedOperationException("Not support VectorType
yet.");
+ return new FlinkVectorWrapper(array.getArray(pos));
}
@Override
@@ -317,6 +317,12 @@ public class FlinkRowWrapper implements InternalRow {
}
}
+ private static class FlinkVectorWrapper extends FlinkArrayWrapper
implements InternalVector {
+ private FlinkVectorWrapper(org.apache.flink.table.data.ArrayData
array) {
+ super(array);
+ }
+ }
+
private static class FlinkMapWrapper implements InternalMap {
private final org.apache.flink.table.data.MapData map;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java
index 8c7779e318..c83e85d6bc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java
@@ -20,7 +20,9 @@ package org.apache.paimon.flink;
import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VectorType;
import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -49,6 +51,19 @@ public class LogicalTypeConversion {
return new BlobType();
}
+ public static VectorType toVectorType(LogicalType elementType, String
vectorDim) {
+ checkArgument(
+ !vectorDim.trim().isEmpty(),
+ "Expected an integer for vector-dim, but got empty value.");
+ try {
+ int dim = Integer.parseInt(vectorDim);
+ return DataTypes.VECTOR(dim, toDataType(elementType));
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(
+ "Expected an integer for vector-dim, but got: " +
vectorDim);
+ }
+ }
+
public static RowType
toDataType(org.apache.flink.table.types.logical.RowType logicalType) {
return (RowType) toDataType(logicalType, new AtomicInteger(-1));
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRowDataWithVectorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRowDataWithVectorTest.java
new file mode 100644
index 0000000000..ba8542d395
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRowDataWithVectorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.data.BinaryVector;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+
+import org.apache.flink.table.data.ArrayData;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FlinkRowData} and vector type. */
+public class FlinkRowDataWithVectorTest {
+
+ @Test
+ public void testVectorAsArrayData() {
+ float[] values = new float[] {1.0f, 2.0f, 3.0f};
+ InternalRow row = GenericRow.of(1,
BinaryVector.fromPrimitiveArray(values));
+
+ FlinkRowData rowData = new FlinkRowData(row);
+ ArrayData arrayData = rowData.getArray(1);
+
+ assertThat(arrayData.toFloatArray()).isEqualTo(values);
+ }
+
+ @Test
+ public void testNullVector() {
+ InternalRow row = GenericRow.of(1, null);
+ FlinkRowData rowData = new FlinkRowData(row);
+
+ assertThat(rowData.isNullAt(1)).isTrue();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRowWrapperWithVectorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRowWrapperWithVectorTest.java
new file mode 100644
index 0000000000..234b453a41
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRowWrapperWithVectorTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FlinkRowWrapper} and vector type. */
+public class FlinkRowWrapperWithVectorTest {
+
+ @Test
+ public void testVectorAccess() {
+ GenericRowData row = new GenericRowData(2);
+ row.setField(0, 1);
+ row.setField(1, new GenericArrayData(new float[] {1.0f, 2.0f, 3.0f}));
+
+ FlinkRowWrapper wrapper = new FlinkRowWrapper(row);
+ assertThat(wrapper.getVector(1).toFloatArray()).isEqualTo(new float[]
{1.0f, 2.0f, 3.0f});
+ }
+
+ @Test
+ public void testNullVector() {
+ GenericRowData row = new GenericRowData(1);
+ row.setField(0, null);
+
+ FlinkRowWrapper wrapper = new FlinkRowWrapper(row);
+ assertThat(wrapper.isNullAt(0)).isTrue();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogicalTypeConversionTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogicalTypeConversionTest.java
new file mode 100644
index 0000000000..142b53c7cb
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogicalTypeConversionTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.VectorType;
+
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link LogicalTypeConversion}. */
+public class LogicalTypeConversionTest {
+
+ @Test
+ public void testToVectorType() {
+ VectorType vectorType = LogicalTypeConversion.toVectorType(new
FloatType(), "3");
+ assertThat(vectorType).isEqualTo(DataTypes.VECTOR(3,
DataTypes.FLOAT()));
+ }
+
+ @Test
+ public void testToVectorTypeInvalidDim() {
+ assertThatThrownBy(() -> LogicalTypeConversion.toVectorType(new
FloatType(), ""))
+ .isInstanceOf(IllegalArgumentException.class);
+ assertThatThrownBy(() -> LogicalTypeConversion.toVectorType(new
FloatType(), "abc"))
+ .isInstanceOf(IllegalArgumentException.class);
+ assertThatThrownBy(() -> LogicalTypeConversion.toVectorType(new
FloatType(), "0"))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void testVectorTypeToLogicalType() {
+ LogicalType logicalType =
+ LogicalTypeConversion.toLogicalType(DataTypes.VECTOR(4,
DataTypes.FLOAT()));
+ assertThat(logicalType).isInstanceOf(ArrayType.class);
+ ArrayType arrayType = (ArrayType) logicalType;
+ assertThat(arrayType.getElementType()).isInstanceOf(FloatType.class);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/VectorTypeTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/VectorTypeTableITCase.java
new file mode 100644
index 0000000000..8c918430e6
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/VectorTypeTableITCase.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.ArrayUtils;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+
+/** Test write and read table with vector type. */
+public class VectorTypeTableITCase extends CatalogITCaseBase {
+
+ private static final Random RANDOM = new Random();
+
+ private final String testTblName = "vector_table";
+
+ private final float[] testVector = randomVector();
+
+ @Override
+ protected List<String> ddl() {
+ return Collections.singletonList(getCreateTableDdl());
+ }
+
+ @Test
+ public void testBasic() throws Exception {
+ batchSql("SELECT * FROM %s", testTblName);
+ batchSql("INSERT INTO %s VALUES %s", testTblName, makeValueStr(1,
false));
+ batchSql("INSERT INTO %s VALUES %s", testTblName, makeValueStr(2,
false));
+
+ { // Check by Flink SQL.
+ List<Row> rows = batchSql("SELECT * FROM %s ORDER BY id ASC",
testTblName);
+ Assertions.assertEquals(2, rows.size());
+
+ Row row = rows.get(0);
+ Assertions.assertEquals(1, (int) row.getFieldAs("id"));
+ Assertions.assertEquals("paimon", row.getFieldAs("data"));
+ Assertions.assertArrayEquals(ArrayUtils.toObject(testVector),
row.getFieldAs("embed"));
+
+ row = rows.get(1);
+ Assertions.assertEquals(2, (int) row.getFieldAs("id"));
+ Assertions.assertEquals("paimon", row.getFieldAs("data"));
+ Assertions.assertArrayEquals(ArrayUtils.toObject(testVector),
row.getFieldAs("embed"));
+ }
+
+ { // Check by Paimon API.
+ List<InternalRow> rows = innerReadData();
+ Assertions.assertEquals(2, rows.size());
+ rows.sort(Comparator.comparingInt(r -> r.getInt(0)));
+ Assertions.assertEquals(2, rows.size());
+
+ InternalRow row = rows.get(0);
+ Assertions.assertEquals(1, row.getInt(0));
+ Assertions.assertEquals("paimon", row.getString(1).toString());
+ Assertions.assertArrayEquals(testVector,
row.getVector(2).toFloatArray());
+
+ row = rows.get(1);
+ Assertions.assertEquals(2, row.getInt(0));
+ Assertions.assertEquals("paimon", row.getString(1).toString());
+ Assertions.assertArrayEquals(testVector,
row.getVector(2).toFloatArray());
+ }
+
+ checkTableSchema();
+ }
+
+ @Test
+ public void testNullValues() throws Exception {
+ batchSql("SELECT * FROM %s", testTblName);
+ batchSql("INSERT INTO %s VALUES %s", testTblName, makeValueStr(1,
false));
+ batchSql("INSERT INTO %s VALUES %s", testTblName, makeValueStr(2,
true));
+ batchSql("INSERT INTO %s VALUES %s", testTblName, makeValueStr(3,
false));
+ batchSql("INSERT INTO %s VALUES %s", testTblName, makeValueStr(4,
true));
+
+ { // Check by Flink SQL.
+ List<Row> rows = batchSql("SELECT * FROM %s ORDER BY id ASC",
testTblName);
+ Assertions.assertEquals(4, rows.size());
+
+ Row row = rows.get(0);
+ Assertions.assertEquals(1, (int) row.getFieldAs("id"));
+ Assertions.assertEquals("paimon", row.getFieldAs("data"));
+ Assertions.assertNotNull(row.getFieldAs("embed"));
+ Assertions.assertArrayEquals(ArrayUtils.toObject(testVector),
row.getFieldAs("embed"));
+
+ row = rows.get(1);
+ Assertions.assertEquals(2, (int) row.getFieldAs("id"));
+ Assertions.assertEquals("paimon", row.getFieldAs("data"));
+ Assertions.assertNull(row.getFieldAs("embed"));
+ }
+
+ { // Check by Paimon API.
+ List<InternalRow> rows = innerReadData();
+ Assertions.assertEquals(4, rows.size());
+ rows.sort(Comparator.comparingInt(r -> r.getInt(0)));
+ Assertions.assertEquals(4, rows.size());
+
+ InternalRow row = rows.get(0);
+ Assertions.assertEquals(1, row.getInt(0));
+ Assertions.assertEquals("paimon", row.getString(1).toString());
+ Assertions.assertFalse(row.isNullAt(2));
+ Assertions.assertArrayEquals(testVector,
row.getVector(2).toFloatArray());
+
+ row = rows.get(1);
+ Assertions.assertEquals(2, row.getInt(0));
+ Assertions.assertEquals("paimon", row.getString(1).toString());
+ Assertions.assertTrue(row.isNullAt(2));
+ }
+
+ checkTableSchema();
+ }
+
+ private void checkTableSchema() throws Exception {
+ DataType vectorType = DataTypes.VECTOR(testVector.length,
DataTypes.FLOAT());
+ List<DataField> fields = paimonTable(testTblName).schema().fields();
+ Assertions.assertEquals(3, fields.size());
+ Assertions.assertEquals(DataTypes.INT(), fields.get(0).type());
+ Assertions.assertEquals(DataTypes.STRING(), fields.get(1).type());
+ Assertions.assertEquals(vectorType, fields.get(2).type());
+ }
+
+ private List<InternalRow> innerReadData() throws Exception {
+ ReadBuilder builder = paimonTable(testTblName).newReadBuilder();
+ RecordReader<InternalRow> reader =
builder.newRead().createReader(builder.newScan().plan());
+ List<InternalRow> rows = new ArrayList<>();
+ reader.forEachRemaining(
+ row -> {
+ rows.add(row);
+ Assertions.assertTrue(rows.size() < 10);
+ });
+ return rows;
+ }
+
+ private String getCreateTableDdl() {
+ return String.format(
+ "CREATE TABLE IF NOT EXISTS `%s` ("
+ + " `id` INT,"
+ + " `data` STRING,"
+ + " `embed` ARRAY<FLOAT>"
+ + ") WITH ("
+ + " 'file.format' = 'json',"
+ + " 'file.compression' = 'none',"
+ + " 'field.embed.vector-dim' = '%d'"
+ + ")",
+ testTblName, testVector.length);
+ }
+
+ private String makeValueStr(int id, boolean nullVector) {
+ String vectorValueStr =
+ nullVector ? "CAST(NULL AS ARRAY<FLOAT>)" : ("ARRAY" +
Arrays.toString(testVector));
+ return String.format("(%d, '%s', %s)", id, "paimon", vectorValueStr);
+ }
+
+ private float[] randomVector() {
+ byte[] randomBytes = new byte[RANDOM.nextInt(1024) + 1];
+ RANDOM.nextBytes(randomBytes);
+ float[] vector = new float[randomBytes.length];
+ for (int i = 0; i < vector.length; i++) {
+ vector[i] = randomBytes[i];
+ }
+ return vector;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/InternalRowSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/InternalRowSerializerTest.java
index c9a8c3e997..58f0119d45 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/InternalRowSerializerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/InternalRowSerializerTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.utils;
import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.BinaryVector;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.types.DataType;
@@ -82,6 +83,30 @@ public class InternalRowSerializerTest {
Assertions.assertThat(row.getLong(2)).isEqualTo(row1.getLong(2));
}
+ @Test
+ public void testSerializeVector() throws Exception {
+ RowType vectorRowType =
+ RowType.builder()
+ .field("id", DataTypes.INT())
+ .field("embed", DataTypes.VECTOR(3, DataTypes.FLOAT()))
+ .build();
+ InternalRowTypeSerializer internalRowTypeSerializer =
+ new InternalRowTypeSerializer(
+ vectorRowType.getFieldTypes().toArray(new
DataType[0]));
+
+ float[] values = new float[] {1.0f, 2.0f, 3.0f};
+ InternalRow row = GenericRow.of(1,
BinaryVector.fromPrimitiveArray(values));
+
+ DataOutputSerializer dataOutputSerializer = new
DataOutputSerializer(100);
+ internalRowTypeSerializer.serialize(row, dataOutputSerializer);
+ InternalRow row1 =
+ internalRowTypeSerializer.deserialize(
+ new
DataInputDeserializer(dataOutputSerializer.wrapAsByteBuffer()));
+
+ Assertions.assertThat(row1.getInt(0)).isEqualTo(1);
+
Assertions.assertThat(row1.getVector(1).toFloatArray()).isEqualTo(values);
+ }
+
private BinaryString randomString() {
int length = RANDOM.nextInt(100);
byte[] buffer = new byte[length];