This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1bba72385b [Improve][Connector-V2] update vectorType (#7446)
1bba72385b is described below

commit 1bba72385b6797dc5edd96fa5951376d0594e633
Author: corgy-w <[email protected]>
AuthorDate: Sat Aug 24 11:17:49 2024 +0800

    [Improve][Connector-V2] update vectorType (#7446)
---
 .../seatunnel/api/table/type/SeaTunnelRow.java     |   1 -
 .../seatunnel/api/table/type/VectorType.java       |  12 +-
 .../apache/seatunnel/common/utils/BufferUtils.java | 130 +++++++++++++++++++++
 .../seatunnel/common/utils/BufferUtilsTest.java    |  90 ++++++++++++++
 .../seatunnel/fake/utils/FakeDataRandomUtils.java  |  24 +---
 .../oceanbase/OceanBaseMysqlJdbcRowConverter.java  |  20 ++--
 .../milvus/convert/MilvusConvertUtils.java         |  13 +--
 .../milvus/source/MilvusSourceReader.java          |   3 +-
 .../e2e/connector/v2/milvus/MilvusIT.java          |  18 +--
 9 files changed, 257 insertions(+), 54 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
index fab1be17f2..801f037b18 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
@@ -143,7 +143,6 @@ public final class SeaTunnelRow implements Serializable {
             case TIMESTAMP:
                 return 48;
             case FLOAT_VECTOR:
-                return getArrayNotNullSize((Object[]) v) * 4;
             case FLOAT16_VECTOR:
             case BFLOAT16_VECTOR:
             case BINARY_VECTOR:
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/VectorType.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/VectorType.java
index 169c686688..75c5dff8a1 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/VectorType.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/VectorType.java
@@ -17,15 +17,23 @@
 
 package org.apache.seatunnel.api.table.type;
 
+import org.apache.seatunnel.api.annotation.Experimental;
+
 import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.Objects;
 
+/**
+ * VectorType represents a vector type in SeaTunnel.
+ *
+ * <p>Experimental feature, use with caution
+ */
+@Experimental
 public class VectorType<T> implements SeaTunnelDataType<T> {
     private static final long serialVersionUID = 2L;
 
-    public static final VectorType<Float> VECTOR_FLOAT_TYPE =
-            new VectorType<>(Float.class, SqlType.FLOAT_VECTOR);
+    public static final VectorType<ByteBuffer> VECTOR_FLOAT_TYPE =
+            new VectorType<>(ByteBuffer.class, SqlType.FLOAT_VECTOR);
 
     public static final VectorType<Map> VECTOR_SPARSE_FLOAT_TYPE =
             new VectorType<>(Map.class, SqlType.SPARSE_FLOAT_VECTOR);
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/BufferUtils.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/BufferUtils.java
new file mode 100644
index 0000000000..ab27c8521f
--- /dev/null
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/BufferUtils.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.common.utils;
+
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+
+public class BufferUtils {
+
+    public static ByteBuffer toByteBuffer(Short[] shortArray) {
+        ByteBuffer byteBuffer = ByteBuffer.allocate(shortArray.length * 2);
+
+        for (Short value : shortArray) {
+            byteBuffer.putShort(value);
+        }
+
+        // Compatible compilation and running versions are not consistent
+        // Flip the buffer to prepare for reading
+        ((Buffer) byteBuffer).flip();
+
+        return byteBuffer;
+    }
+
+    public static Short[] toShortArray(ByteBuffer byteBuffer) {
+        Short[] shortArray = new Short[byteBuffer.capacity() / 2];
+
+        for (int i = 0; i < shortArray.length; i++) {
+            shortArray[i] = byteBuffer.getShort();
+        }
+
+        return shortArray;
+    }
+
+    public static ByteBuffer toByteBuffer(Float[] floatArray) {
+        ByteBuffer byteBuffer = ByteBuffer.allocate(floatArray.length * 4);
+
+        for (float value : floatArray) {
+            byteBuffer.putFloat(value);
+        }
+
+        ((Buffer) byteBuffer).flip();
+
+        return byteBuffer;
+    }
+
+    public static Float[] toFloatArray(ByteBuffer byteBuffer) {
+        Float[] floatArray = new Float[byteBuffer.capacity() / 4];
+
+        for (int i = 0; i < floatArray.length; i++) {
+            floatArray[i] = byteBuffer.getFloat();
+        }
+
+        return floatArray;
+    }
+
+    public static ByteBuffer toByteBuffer(Double[] doubleArray) {
+        ByteBuffer byteBuffer = ByteBuffer.allocate(doubleArray.length * 8);
+
+        for (double value : doubleArray) {
+            byteBuffer.putDouble(value);
+        }
+
+        ((Buffer) byteBuffer).flip();
+
+        return byteBuffer;
+    }
+
+    public static Double[] toDoubleArray(ByteBuffer byteBuffer) {
+        Double[] doubleArray = new Double[byteBuffer.capacity() / 8];
+
+        for (int i = 0; i < doubleArray.length; i++) {
+            doubleArray[i] = byteBuffer.getDouble();
+        }
+
+        return doubleArray;
+    }
+
+    public static ByteBuffer toByteBuffer(Integer[] intArray) {
+        ByteBuffer byteBuffer = ByteBuffer.allocate(intArray.length * 4);
+
+        for (int value : intArray) {
+            byteBuffer.putInt(value);
+        }
+
+        ((Buffer) byteBuffer).flip();
+
+        return byteBuffer;
+    }
+
+    public static Integer[] toIntArray(ByteBuffer byteBuffer) {
+        Integer[] intArray = new Integer[byteBuffer.capacity() / 4];
+
+        for (int i = 0; i < intArray.length; i++) {
+            intArray[i] = byteBuffer.getInt();
+        }
+
+        return intArray;
+    }
+}
diff --git 
a/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/BufferUtilsTest.java
 
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/BufferUtilsTest.java
new file mode 100644
index 0000000000..36a010ec91
--- /dev/null
+++ 
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/BufferUtilsTest.java
@@ -0,0 +1,90 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.seatunnel.common.utils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+
+public class BufferUtilsTest {
+
+    @Test
+    public void testToByteBufferAndToShortArray() {
+        Short[] shortArray = {1, 2, 3, 4, 5};
+        ByteBuffer byteBuffer = BufferUtils.toByteBuffer(shortArray);
+        Short[] resultArray = BufferUtils.toShortArray(byteBuffer);
+
+        Assertions.assertArrayEquals(shortArray, resultArray, "Short array 
conversion failed");
+    }
+
+    @Test
+    public void testToByteBufferAndToFloatArray() {
+        Float[] floatArray = {1.1f, 2.2f, 3.3f, 4.4f, 5.5f};
+        ByteBuffer byteBuffer = BufferUtils.toByteBuffer(floatArray);
+        Float[] resultArray = BufferUtils.toFloatArray(byteBuffer);
+
+        Assertions.assertArrayEquals(floatArray, resultArray, "Float array 
conversion failed");
+    }
+
+    @Test
+    public void testToByteBufferAndToDoubleArray() {
+        Double[] doubleArray = {1.1, 2.2, 3.3, 4.4, 5.5};
+        ByteBuffer byteBuffer = BufferUtils.toByteBuffer(doubleArray);
+        Double[] resultArray = BufferUtils.toDoubleArray(byteBuffer);
+
+        Assertions.assertArrayEquals(doubleArray, resultArray, "Double array 
conversion failed");
+    }
+
+    @Test
+    public void testToByteBufferAndToIntArray() {
+        Integer[] intArray = {1, 2, 3, 4, 5};
+        ByteBuffer byteBuffer = BufferUtils.toByteBuffer(intArray);
+        Integer[] resultArray = BufferUtils.toIntArray(byteBuffer);
+
+        Assertions.assertArrayEquals(intArray, resultArray, "Integer array 
conversion failed");
+    }
+
+    @Test
+    public void testEmptyArrayConversion() {
+        // Test empty arrays
+        Short[] shortArray = {};
+        ByteBuffer shortBuffer = BufferUtils.toByteBuffer(shortArray);
+        Short[] shortResultArray = BufferUtils.toShortArray(shortBuffer);
+        Assertions.assertArrayEquals(
+                shortArray, shortResultArray, "Empty Short array conversion 
failed");
+
+        Float[] floatArray = {};
+        ByteBuffer floatBuffer = BufferUtils.toByteBuffer(floatArray);
+        Float[] floatResultArray = BufferUtils.toFloatArray(floatBuffer);
+        Assertions.assertArrayEquals(
+                floatArray, floatResultArray, "Empty Float array conversion 
failed");
+
+        Double[] doubleArray = {};
+        ByteBuffer doubleBuffer = BufferUtils.toByteBuffer(doubleArray);
+        Double[] doubleResultArray = BufferUtils.toDoubleArray(doubleBuffer);
+        Assertions.assertArrayEquals(
+                doubleArray, doubleResultArray, "Empty Double array conversion 
failed");
+
+        Integer[] intArray = {};
+        ByteBuffer intBuffer = BufferUtils.toByteBuffer(intArray);
+        Integer[] intResultArray = BufferUtils.toIntArray(intBuffer);
+        Assertions.assertArrayEquals(
+                intArray, intResultArray, "Empty Integer array conversion 
failed");
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java
index 8491a57732..8a8a14dc70 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.fake.utils;
 
+import org.apache.seatunnel.common.utils.BufferUtils;
 import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
 
 import org.apache.commons.collections4.CollectionUtils;
@@ -24,7 +25,6 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.RandomUtils;
 
 import java.math.BigDecimal;
-import java.nio.Buffer;
 import java.nio.ByteBuffer;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
@@ -178,14 +178,14 @@ public class FakeDataRandomUtils {
         return ByteBuffer.wrap(RandomUtils.nextBytes(byteCount));
     }
 
-    public Float[] randomFloatVector() {
+    public ByteBuffer randomFloatVector() {
         Float[] floatVector = new Float[fakeConfig.getVectorDimension()];
         for (int i = 0; i < fakeConfig.getVectorDimension(); i++) {
             floatVector[i] =
                     RandomUtils.nextFloat(
                             fakeConfig.getVectorFloatMin(), 
fakeConfig.getVectorFloatMax());
         }
-        return floatVector;
+        return BufferUtils.toByteBuffer(floatVector);
     }
 
     public ByteBuffer randomFloat16Vector() {
@@ -196,7 +196,7 @@ public class FakeDataRandomUtils {
                             fakeConfig.getVectorFloatMin(), 
fakeConfig.getVectorFloatMax());
             float16Vector[i] = floatToFloat16(value);
         }
-        return shortArrayToByteBuffer(float16Vector);
+        return BufferUtils.toByteBuffer(float16Vector);
     }
 
     public ByteBuffer randomBFloat16Vector() {
@@ -207,7 +207,7 @@ public class FakeDataRandomUtils {
                             fakeConfig.getVectorFloatMin(), 
fakeConfig.getVectorFloatMax());
             bfloat16Vector[i] = floatToBFloat16(value);
         }
-        return shortArrayToByteBuffer(bfloat16Vector);
+        return BufferUtils.toByteBuffer(bfloat16Vector);
     }
 
     public Map<Integer, Float> randomSparseFloatVector() {
@@ -242,20 +242,6 @@ public class FakeDataRandomUtils {
         return (short) (sign | (exponent << 10) | (mantissa >> 13));
     }
 
-    private static ByteBuffer shortArrayToByteBuffer(Short[] shortArray) {
-        ByteBuffer byteBuffer = ByteBuffer.allocate(shortArray.length * 2);
-
-        for (Short value : shortArray) {
-            byteBuffer.putShort(value);
-        }
-
-        // Compatible compilation and running versions are not consistent
-        // Flip the buffer to prepare for reading
-        ((Buffer) byteBuffer).flip();
-
-        return byteBuffer;
-    }
-
     private static short floatToBFloat16(float value) {
         int intBits = Float.floatToIntBits(value);
         return (short) (intBits >> 16);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
index 2092a54f98..a498879138 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.table.type.SqlType;
 import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.utils.BufferUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
@@ -32,6 +33,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseI
 import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;
 
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.sql.Date;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -40,8 +42,6 @@ import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Optional;
 
 public class OceanBaseMysqlJdbcRowConverter extends AbstractJdbcRowConverter {
@@ -89,11 +89,13 @@ public class OceanBaseMysqlJdbcRowConverter extends 
AbstractJdbcRowConverter {
                     fields[fieldIndex] = JdbcFieldTypeUtils.getFloat(rs, 
resultSetIndex);
                     break;
                 case FLOAT_VECTOR:
-                    List<Float> vector = new ArrayList<>();
-                    for (Object o : (Object[]) rs.getObject(fieldIndex)) {
-                        vector.add(Float.parseFloat(o.toString()));
+                    Object[] objects = (Object[]) rs.getObject(fieldIndex);
+                    Float[] arrays = new Float[objects.length];
+                    for (int i = 0; i < objects.length; i++) {
+                        arrays[i] = Float.parseFloat(objects[i].toString());
                     }
-                    fields[fieldIndex] = vector;
+                    fields[fieldIndex] = BufferUtils.toByteBuffer(arrays);
+                    break;
                 case DOUBLE:
                     fields[fieldIndex] = JdbcFieldTypeUtils.getDouble(rs, 
resultSetIndex);
                     break;
@@ -172,8 +174,10 @@ public class OceanBaseMysqlJdbcRowConverter extends 
AbstractJdbcRowConverter {
                         statement.setFloat(statementIndex, (Float) 
row.getField(fieldIndex));
                         break;
                     case FLOAT_VECTOR:
-                        if (row.getField(fieldIndex) instanceof Float[]) {
-                            Float[] floatArray = (Float[]) 
row.getField(fieldIndex);
+                        if (row.getField(fieldIndex) instanceof ByteBuffer) {
+                            ByteBuffer byteBuffer = (ByteBuffer) 
row.getField(fieldIndex);
+                            // Convert ByteBuffer to Float[]
+                            Float[] floatArray = 
BufferUtils.toFloatArray(byteBuffer);
                             StringBuilder vector = new StringBuilder();
                             vector.append("[");
                             for (Float aFloat : floatArray) {
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/convert/MilvusConvertUtils.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/convert/MilvusConvertUtils.java
index 6886707382..0e7a898b23 100644
--- 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/convert/MilvusConvertUtils.java
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/convert/MilvusConvertUtils.java
@@ -34,6 +34,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SqlType;
 import org.apache.seatunnel.api.table.type.VectorType;
+import org.apache.seatunnel.common.utils.BufferUtils;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.connectors.seatunnel.milvus.catalog.MilvusOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig;
@@ -320,16 +321,14 @@ public class MilvusConvertUtils {
             case DATE:
                 return value.toString();
             case FLOAT_VECTOR:
-                List<Float> vector = new ArrayList<>();
-                for (Object o : (Object[]) value) {
-                    vector.add(Float.parseFloat(o.toString()));
-                }
-                return vector;
+                ByteBuffer floatVectorBuffer = (ByteBuffer) value;
+                Float[] floats = BufferUtils.toFloatArray(floatVectorBuffer);
+                return Arrays.stream(floats).collect(Collectors.toList());
             case BINARY_VECTOR:
             case BFLOAT16_VECTOR:
             case FLOAT16_VECTOR:
-                ByteBuffer binaryVector = (ByteBuffer) value;
-                return gson.toJsonTree(binaryVector.array());
+                ByteBuffer vector = (ByteBuffer) value;
+                return gson.toJsonTree(vector.array());
             case SPARSE_FLOAT_VECTOR:
                 return 
JsonParser.parseString(JacksonUtils.toJsonString(value)).getAsJsonObject();
             case FLOAT:
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
index b8b391d0d4..7464c652b3 100644
--- 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.utils.BufferUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException;
@@ -220,7 +221,7 @@ public class MilvusSourceReader implements 
SourceReader<SeaTunnelRow, MilvusSour
                         for (int i = 0; i < list.size(); i++) {
                             arrays[i] = 
Float.parseFloat(list.get(i).toString());
                         }
-                        fields[fieldIndex] = arrays;
+                        fields[fieldIndex] = BufferUtils.toByteBuffer(arrays);
                         break;
                     } else {
                         throw new MilvusConnectorException(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
index cead62af7e..d406250e77 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.e2e.connector.v2.milvus;
 
+import org.apache.seatunnel.common.utils.BufferUtils;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.EngineType;
@@ -54,7 +55,6 @@ import io.milvus.param.index.CreateIndexParam;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
-import java.nio.Buffer;
 import java.nio.ByteBuffer;
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -227,7 +227,7 @@ public class MilvusIT extends TestSuiteBase implements 
TestResource {
             List<Float> vector = Arrays.asList((float) i, (float) i, (float) 
i, (float) i);
             row.add(VECTOR_FIELD, gson.toJsonTree(vector));
             Short[] shorts = {(short) i, (short) i, (short) i, (short) i};
-            ByteBuffer shortByteBuffer = shortArrayToByteBuffer(shorts);
+            ByteBuffer shortByteBuffer = BufferUtils.toByteBuffer(shorts);
             row.add(VECTOR_FIELD2, gson.toJsonTree(shortByteBuffer.array()));
             ByteBuffer binaryByteBuffer = ByteBuffer.wrap(new byte[] {16});
             row.add(VECTOR_FIELD3, gson.toJsonTree(binaryByteBuffer.array()));
@@ -362,18 +362,4 @@ public class MilvusIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertTrue(fileds.contains("book_intro_3"));
         Assertions.assertTrue(fileds.contains("book_intro_4"));
     }
-
-    private static ByteBuffer shortArrayToByteBuffer(Short[] shortArray) {
-        ByteBuffer byteBuffer = ByteBuffer.allocate(shortArray.length * 2); // 
2 bytes per short
-
-        for (Short value : shortArray) {
-            byteBuffer.putShort(value);
-        }
-
-        // Compatible compilation and running versions are not consistent
-        // Flip the buffer to prepare for reading
-        ((Buffer) byteBuffer).flip();
-
-        return byteBuffer;
-    }
 }

Reply via email to