This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9708d745fb [flink] add flink support for VectorType (#7238)
9708d745fb is described below

commit 9708d745fb1dfc5318c61ca6737ce0f1f44b0952
Author: ColdL <[email protected]>
AuthorDate: Mon Feb 9 15:21:30 2026 +0800

    [flink] add flink support for VectorType (#7238)
---
 .../apache/paimon/flink/DataTypeToLogicalType.java |   3 +-
 .../java/org/apache/paimon/flink/FlinkCatalog.java |  25 ++-
 .../org/apache/paimon/flink/FlinkRowWrapper.java   |  10 +-
 .../apache/paimon/flink/LogicalTypeConversion.java |  15 ++
 .../paimon/flink/FlinkRowDataWithVectorTest.java   |  51 ++++++
 .../flink/FlinkRowWrapperWithVectorTest.java       |  48 ++++++
 .../paimon/flink/LogicalTypeConversionTest.java    |  59 +++++++
 .../apache/paimon/flink/VectorTypeTableITCase.java | 190 +++++++++++++++++++++
 .../flink/utils/InternalRowSerializerTest.java     |  25 +++
 9 files changed, 420 insertions(+), 6 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java
index 6fc3016d4f..16a270587d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java
@@ -162,7 +162,8 @@ public class DataTypeToLogicalType implements 
DataTypeVisitor<LogicalType> {
 
     @Override
     public LogicalType visit(VectorType vectorType) {
-        throw new UnsupportedOperationException("Not support VectorType yet.");
+        return new org.apache.flink.table.types.logical.ArrayType(
+                vectorType.isNullable(), 
vectorType.getElementType().accept(this));
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index de0d4e3f26..c0eddff646 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -155,6 +155,7 @@ import static 
org.apache.paimon.flink.FlinkCatalogOptions.DISABLE_CREATE_TABLE_I
 import static org.apache.paimon.flink.LogicalTypeConversion.toBlobType;
 import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
 import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
+import static org.apache.paimon.flink.LogicalTypeConversion.toVectorType;
 import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.SCHEMA;
 import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey;
 import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn;
@@ -1040,14 +1041,32 @@ public class FlinkCatalog extends AbstractCatalog {
                         field ->
                                 schemaBuilder.column(
                                         field.getName(),
-                                        blobFields.contains(field.getName())
-                                                ? toBlobType(field.getType())
-                                                : toDataType(field.getType()),
+                                        resolveDataType(field.getName(), 
field.getType(), options),
                                         columnComments.get(field.getName())));
 
         return schemaBuilder.build();
     }
 
+    private static org.apache.paimon.types.DataType resolveDataType(
+            String fieldName,
+            org.apache.flink.table.types.logical.LogicalType logicalType,
+            Map<String, String> options) {
+        List<String> blobFields = CoreOptions.blobField(options);
+        if (blobFields.contains(fieldName)) {
+            return toBlobType(logicalType);
+        }
+        if (logicalType instanceof 
org.apache.flink.table.types.logical.ArrayType) {
+            String vectorDim = 
options.get(String.format("field.%s.vector-dim", fieldName));
+            if (vectorDim != null) {
+                org.apache.flink.table.types.logical.LogicalType elementType =
+                        ((org.apache.flink.table.types.logical.ArrayType) 
logicalType)
+                                .getElementType();
+                return toVectorType(elementType, vectorDim);
+            }
+        }
+        return toDataType(logicalType);
+    }
+
     private static Map<String, String> getColumnComments(CatalogBaseTable 
catalogTable) {
         return catalogTable.getUnresolvedSchema().getColumns().stream()
                 .filter(c -> c.getComment().isPresent())
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
index ae8a4fc592..3201f594fe 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
@@ -163,7 +163,7 @@ public class FlinkRowWrapper implements InternalRow {
 
     @Override
     public InternalVector getVector(int pos) {
-        throw new UnsupportedOperationException("Not support VectorType yet.");
+        return new FlinkVectorWrapper(row.getArray(pos));
     }
 
     @Override
@@ -268,7 +268,7 @@ public class FlinkRowWrapper implements InternalRow {
 
         @Override
         public InternalVector getVector(int pos) {
-            throw new UnsupportedOperationException("Not support VectorType 
yet.");
+            return new FlinkVectorWrapper(array.getArray(pos));
         }
 
         @Override
@@ -317,6 +317,12 @@ public class FlinkRowWrapper implements InternalRow {
         }
     }
 
+    private static class FlinkVectorWrapper extends FlinkArrayWrapper 
implements InternalVector {
+        private FlinkVectorWrapper(org.apache.flink.table.data.ArrayData 
array) {
+            super(array);
+        }
+    }
+
     private static class FlinkMapWrapper implements InternalMap {
 
         private final org.apache.flink.table.data.MapData map;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java
index 8c7779e318..c83e85d6bc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java
@@ -20,7 +20,9 @@ package org.apache.paimon.flink;
 
 import org.apache.paimon.types.BlobType;
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VectorType;
 
 import org.apache.flink.table.types.logical.BinaryType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -49,6 +51,19 @@ public class LogicalTypeConversion {
         return new BlobType();
     }
 
+    public static VectorType toVectorType(LogicalType elementType, String 
vectorDim) {
+        checkArgument(
+                !vectorDim.trim().isEmpty(),
+                "Expected an integer for vector-dim, but got empty value.");
+        try {
+            int dim = Integer.parseInt(vectorDim);
+            return DataTypes.VECTOR(dim, toDataType(elementType));
+        } catch (NumberFormatException e) {
+            throw new IllegalArgumentException(
+                    "Expected an integer for vector-dim, but got: " + 
vectorDim);
+        }
+    }
+
     public static RowType 
toDataType(org.apache.flink.table.types.logical.RowType logicalType) {
         return (RowType) toDataType(logicalType, new AtomicInteger(-1));
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRowDataWithVectorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRowDataWithVectorTest.java
new file mode 100644
index 0000000000..ba8542d395
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRowDataWithVectorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.data.BinaryVector;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+
+import org.apache.flink.table.data.ArrayData;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FlinkRowData} and vector type. */
+public class FlinkRowDataWithVectorTest {
+
+    @Test
+    public void testVectorAsArrayData() {
+        float[] values = new float[] {1.0f, 2.0f, 3.0f};
+        InternalRow row = GenericRow.of(1, 
BinaryVector.fromPrimitiveArray(values));
+
+        FlinkRowData rowData = new FlinkRowData(row);
+        ArrayData arrayData = rowData.getArray(1);
+
+        assertThat(arrayData.toFloatArray()).isEqualTo(values);
+    }
+
+    @Test
+    public void testNullVector() {
+        InternalRow row = GenericRow.of(1, null);
+        FlinkRowData rowData = new FlinkRowData(row);
+
+        assertThat(rowData.isNullAt(1)).isTrue();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRowWrapperWithVectorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRowWrapperWithVectorTest.java
new file mode 100644
index 0000000000..234b453a41
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRowWrapperWithVectorTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FlinkRowWrapper} and vector type. */
+public class FlinkRowWrapperWithVectorTest {
+
+    @Test
+    public void testVectorAccess() {
+        GenericRowData row = new GenericRowData(2);
+        row.setField(0, 1);
+        row.setField(1, new GenericArrayData(new float[] {1.0f, 2.0f, 3.0f}));
+
+        FlinkRowWrapper wrapper = new FlinkRowWrapper(row);
+        assertThat(wrapper.getVector(1).toFloatArray()).isEqualTo(new float[] 
{1.0f, 2.0f, 3.0f});
+    }
+
+    @Test
+    public void testNullVector() {
+        GenericRowData row = new GenericRowData(1);
+        row.setField(0, null);
+
+        FlinkRowWrapper wrapper = new FlinkRowWrapper(row);
+        assertThat(wrapper.isNullAt(0)).isTrue();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogicalTypeConversionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogicalTypeConversionTest.java
new file mode 100644
index 0000000000..142b53c7cb
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogicalTypeConversionTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.VectorType;
+
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link LogicalTypeConversion}. */
+public class LogicalTypeConversionTest {
+
+    @Test
+    public void testToVectorType() {
+        VectorType vectorType = LogicalTypeConversion.toVectorType(new 
FloatType(), "3");
+        assertThat(vectorType).isEqualTo(DataTypes.VECTOR(3, 
DataTypes.FLOAT()));
+    }
+
+    @Test
+    public void testToVectorTypeInvalidDim() {
+        assertThatThrownBy(() -> LogicalTypeConversion.toVectorType(new 
FloatType(), ""))
+                .isInstanceOf(IllegalArgumentException.class);
+        assertThatThrownBy(() -> LogicalTypeConversion.toVectorType(new 
FloatType(), "abc"))
+                .isInstanceOf(IllegalArgumentException.class);
+        assertThatThrownBy(() -> LogicalTypeConversion.toVectorType(new 
FloatType(), "0"))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    public void testVectorTypeToLogicalType() {
+        LogicalType logicalType =
+                LogicalTypeConversion.toLogicalType(DataTypes.VECTOR(4, 
DataTypes.FLOAT()));
+        assertThat(logicalType).isInstanceOf(ArrayType.class);
+        ArrayType arrayType = (ArrayType) logicalType;
+        assertThat(arrayType.getElementType()).isInstanceOf(FloatType.class);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/VectorTypeTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/VectorTypeTableITCase.java
new file mode 100644
index 0000000000..8c918430e6
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/VectorTypeTableITCase.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.ArrayUtils;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+
+/** Test write and read table with vector type. */
+public class VectorTypeTableITCase extends CatalogITCaseBase {
+
+    private static final Random RANDOM = new Random();
+
+    private final String testTblName = "vector_table";
+
+    private final float[] testVector = randomVector();
+
+    @Override
+    protected List<String> ddl() {
+        return Collections.singletonList(getCreateTableDdl());
+    }
+
+    @Test
+    public void testBasic() throws Exception {
+        batchSql("SELECT * FROM %s", testTblName);
+        batchSql("INSERT INTO %s VALUES %s", testTblName, makeValueStr(1, 
false));
+        batchSql("INSERT INTO %s VALUES %s", testTblName, makeValueStr(2, 
false));
+
+        { // Check by Flink SQL.
+            List<Row> rows = batchSql("SELECT * FROM %s ORDER BY id ASC", 
testTblName);
+            Assertions.assertEquals(2, rows.size());
+
+            Row row = rows.get(0);
+            Assertions.assertEquals(1, (int) row.getFieldAs("id"));
+            Assertions.assertEquals("paimon", row.getFieldAs("data"));
+            Assertions.assertArrayEquals(ArrayUtils.toObject(testVector), 
row.getFieldAs("embed"));
+
+            row = rows.get(1);
+            Assertions.assertEquals(2, (int) row.getFieldAs("id"));
+            Assertions.assertEquals("paimon", row.getFieldAs("data"));
+            Assertions.assertArrayEquals(ArrayUtils.toObject(testVector), 
row.getFieldAs("embed"));
+        }
+
+        { // Check by Paimon API.
+            List<InternalRow> rows = innerReadData();
+            Assertions.assertEquals(2, rows.size());
+            rows.sort(Comparator.comparingInt(r -> r.getInt(0)));
+            Assertions.assertEquals(2, rows.size());
+
+            InternalRow row = rows.get(0);
+            Assertions.assertEquals(1, row.getInt(0));
+            Assertions.assertEquals("paimon", row.getString(1).toString());
+            Assertions.assertArrayEquals(testVector, 
row.getVector(2).toFloatArray());
+
+            row = rows.get(1);
+            Assertions.assertEquals(2, row.getInt(0));
+            Assertions.assertEquals("paimon", row.getString(1).toString());
+            Assertions.assertArrayEquals(testVector, 
row.getVector(2).toFloatArray());
+        }
+
+        checkTableSchema();
+    }
+
+    @Test
+    public void testNullValues() throws Exception {
+        batchSql("SELECT * FROM %s", testTblName);
+        batchSql("INSERT INTO %s VALUES %s", testTblName, makeValueStr(1, 
false));
+        batchSql("INSERT INTO %s VALUES %s", testTblName, makeValueStr(2, 
true));
+        batchSql("INSERT INTO %s VALUES %s", testTblName, makeValueStr(3, 
false));
+        batchSql("INSERT INTO %s VALUES %s", testTblName, makeValueStr(4, 
true));
+
+        { // Check by Flink SQL.
+            List<Row> rows = batchSql("SELECT * FROM %s ORDER BY id ASC", 
testTblName);
+            Assertions.assertEquals(4, rows.size());
+
+            Row row = rows.get(0);
+            Assertions.assertEquals(1, (int) row.getFieldAs("id"));
+            Assertions.assertEquals("paimon", row.getFieldAs("data"));
+            Assertions.assertNotNull(row.getFieldAs("embed"));
+            Assertions.assertArrayEquals(ArrayUtils.toObject(testVector), 
row.getFieldAs("embed"));
+
+            row = rows.get(1);
+            Assertions.assertEquals(2, (int) row.getFieldAs("id"));
+            Assertions.assertEquals("paimon", row.getFieldAs("data"));
+            Assertions.assertNull(row.getFieldAs("embed"));
+        }
+
+        { // Check by Paimon API.
+            List<InternalRow> rows = innerReadData();
+            Assertions.assertEquals(4, rows.size());
+            rows.sort(Comparator.comparingInt(r -> r.getInt(0)));
+            Assertions.assertEquals(4, rows.size());
+
+            InternalRow row = rows.get(0);
+            Assertions.assertEquals(1, row.getInt(0));
+            Assertions.assertEquals("paimon", row.getString(1).toString());
+            Assertions.assertFalse(row.isNullAt(2));
+            Assertions.assertArrayEquals(testVector, 
row.getVector(2).toFloatArray());
+
+            row = rows.get(1);
+            Assertions.assertEquals(2, row.getInt(0));
+            Assertions.assertEquals("paimon", row.getString(1).toString());
+            Assertions.assertTrue(row.isNullAt(2));
+        }
+
+        checkTableSchema();
+    }
+
+    private void checkTableSchema() throws Exception {
+        DataType vectorType = DataTypes.VECTOR(testVector.length, 
DataTypes.FLOAT());
+        List<DataField> fields = paimonTable(testTblName).schema().fields();
+        Assertions.assertEquals(3, fields.size());
+        Assertions.assertEquals(DataTypes.INT(), fields.get(0).type());
+        Assertions.assertEquals(DataTypes.STRING(), fields.get(1).type());
+        Assertions.assertEquals(vectorType, fields.get(2).type());
+    }
+
+    private List<InternalRow> innerReadData() throws Exception {
+        ReadBuilder builder = paimonTable(testTblName).newReadBuilder();
+        RecordReader<InternalRow> reader = 
builder.newRead().createReader(builder.newScan().plan());
+        List<InternalRow> rows = new ArrayList<>();
+        reader.forEachRemaining(
+                row -> {
+                    rows.add(row);
+                    Assertions.assertTrue(rows.size() < 10);
+                });
+        return rows;
+    }
+
+    private String getCreateTableDdl() {
+        return String.format(
+                "CREATE TABLE IF NOT EXISTS `%s` ("
+                        + "    `id` INT,"
+                        + "    `data` STRING,"
+                        + "    `embed` ARRAY<FLOAT>"
+                        + ") WITH ("
+                        + "    'file.format' = 'json',"
+                        + "    'file.compression' = 'none',"
+                        + "    'field.embed.vector-dim' = '%d'"
+                        + ")",
+                testTblName, testVector.length);
+    }
+
+    private String makeValueStr(int id, boolean nullVector) {
+        String vectorValueStr =
+                nullVector ? "CAST(NULL AS ARRAY<FLOAT>)" : ("ARRAY" + 
Arrays.toString(testVector));
+        return String.format("(%d, '%s', %s)", id, "paimon", vectorValueStr);
+    }
+
+    private float[] randomVector() {
+        byte[] randomBytes = new byte[RANDOM.nextInt(1024) + 1];
+        RANDOM.nextBytes(randomBytes);
+        float[] vector = new float[randomBytes.length];
+        for (int i = 0; i < vector.length; i++) {
+            vector[i] = randomBytes[i];
+        }
+        return vector;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/InternalRowSerializerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/InternalRowSerializerTest.java
index c9a8c3e997..58f0119d45 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/InternalRowSerializerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/InternalRowSerializerTest.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.utils;
 
 import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.BinaryVector;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.types.DataType;
@@ -82,6 +83,30 @@ public class InternalRowSerializerTest {
         Assertions.assertThat(row.getLong(2)).isEqualTo(row1.getLong(2));
     }
 
+    @Test
+    public void testSerializeVector() throws Exception {
+        RowType vectorRowType =
+                RowType.builder()
+                        .field("id", DataTypes.INT())
+                        .field("embed", DataTypes.VECTOR(3, DataTypes.FLOAT()))
+                        .build();
+        InternalRowTypeSerializer internalRowTypeSerializer =
+                new InternalRowTypeSerializer(
+                        vectorRowType.getFieldTypes().toArray(new 
DataType[0]));
+
+        float[] values = new float[] {1.0f, 2.0f, 3.0f};
+        InternalRow row = GenericRow.of(1, 
BinaryVector.fromPrimitiveArray(values));
+
+        DataOutputSerializer dataOutputSerializer = new 
DataOutputSerializer(100);
+        internalRowTypeSerializer.serialize(row, dataOutputSerializer);
+        InternalRow row1 =
+                internalRowTypeSerializer.deserialize(
+                        new 
DataInputDeserializer(dataOutputSerializer.wrapAsByteBuffer()));
+
+        Assertions.assertThat(row1.getInt(0)).isEqualTo(1);
+        
Assertions.assertThat(row1.getVector(1).toFloatArray()).isEqualTo(values);
+    }
+
     private BinaryString randomString() {
         int length = RANDOM.nextInt(100);
         byte[] buffer = new byte[length];

Reply via email to