This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1bba72385b [Improve][Connector-V2] update vectorType (#7446)
1bba72385b is described below
commit 1bba72385b6797dc5edd96fa5951376d0594e633
Author: corgy-w <[email protected]>
AuthorDate: Sat Aug 24 11:17:49 2024 +0800
[Improve][Connector-V2] update vectorType (#7446)
---
.../seatunnel/api/table/type/SeaTunnelRow.java | 1 -
.../seatunnel/api/table/type/VectorType.java | 12 +-
.../apache/seatunnel/common/utils/BufferUtils.java | 130 +++++++++++++++++++++
.../seatunnel/common/utils/BufferUtilsTest.java | 90 ++++++++++++++
.../seatunnel/fake/utils/FakeDataRandomUtils.java | 24 +---
.../oceanbase/OceanBaseMysqlJdbcRowConverter.java | 20 ++--
.../milvus/convert/MilvusConvertUtils.java | 13 +--
.../milvus/source/MilvusSourceReader.java | 3 +-
.../e2e/connector/v2/milvus/MilvusIT.java | 18 +--
9 files changed, 257 insertions(+), 54 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
index fab1be17f2..801f037b18 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
@@ -143,7 +143,6 @@ public final class SeaTunnelRow implements Serializable {
case TIMESTAMP:
return 48;
case FLOAT_VECTOR:
- return getArrayNotNullSize((Object[]) v) * 4;
case FLOAT16_VECTOR:
case BFLOAT16_VECTOR:
case BINARY_VECTOR:
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/VectorType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/VectorType.java
index 169c686688..75c5dff8a1 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/VectorType.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/VectorType.java
@@ -17,15 +17,23 @@
package org.apache.seatunnel.api.table.type;
+import org.apache.seatunnel.api.annotation.Experimental;
+
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
+/**
+ * VectorType represents a vector type in SeaTunnel.
+ *
+ * <p>Experimental feature, use with caution
+ */
+@Experimental
public class VectorType<T> implements SeaTunnelDataType<T> {
private static final long serialVersionUID = 2L;
- public static final VectorType<Float> VECTOR_FLOAT_TYPE =
- new VectorType<>(Float.class, SqlType.FLOAT_VECTOR);
+ public static final VectorType<ByteBuffer> VECTOR_FLOAT_TYPE =
+ new VectorType<>(ByteBuffer.class, SqlType.FLOAT_VECTOR);
public static final VectorType<Map> VECTOR_SPARSE_FLOAT_TYPE =
new VectorType<>(Map.class, SqlType.SPARSE_FLOAT_VECTOR);
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/BufferUtils.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/BufferUtils.java
new file mode 100644
index 0000000000..ab27c8521f
--- /dev/null
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/BufferUtils.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.common.utils;
+
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+
+public class BufferUtils {
+
+ public static ByteBuffer toByteBuffer(Short[] shortArray) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(shortArray.length * 2);
+
+ for (Short value : shortArray) {
+ byteBuffer.putShort(value);
+ }
+
+ // Compatible compilation and running versions are not consistent
+ // Flip the buffer to prepare for reading
+ ((Buffer) byteBuffer).flip();
+
+ return byteBuffer;
+ }
+
+ public static Short[] toShortArray(ByteBuffer byteBuffer) {
+ Short[] shortArray = new Short[byteBuffer.capacity() / 2];
+
+ for (int i = 0; i < shortArray.length; i++) {
+ shortArray[i] = byteBuffer.getShort();
+ }
+
+ return shortArray;
+ }
+
+ public static ByteBuffer toByteBuffer(Float[] floatArray) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(floatArray.length * 4);
+
+ for (float value : floatArray) {
+ byteBuffer.putFloat(value);
+ }
+
+ ((Buffer) byteBuffer).flip();
+
+ return byteBuffer;
+ }
+
+ public static Float[] toFloatArray(ByteBuffer byteBuffer) {
+ Float[] floatArray = new Float[byteBuffer.capacity() / 4];
+
+ for (int i = 0; i < floatArray.length; i++) {
+ floatArray[i] = byteBuffer.getFloat();
+ }
+
+ return floatArray;
+ }
+
+ public static ByteBuffer toByteBuffer(Double[] doubleArray) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(doubleArray.length * 8);
+
+ for (double value : doubleArray) {
+ byteBuffer.putDouble(value);
+ }
+
+ ((Buffer) byteBuffer).flip();
+
+ return byteBuffer;
+ }
+
+ public static Double[] toDoubleArray(ByteBuffer byteBuffer) {
+ Double[] doubleArray = new Double[byteBuffer.capacity() / 8];
+
+ for (int i = 0; i < doubleArray.length; i++) {
+ doubleArray[i] = byteBuffer.getDouble();
+ }
+
+ return doubleArray;
+ }
+
+ public static ByteBuffer toByteBuffer(Integer[] intArray) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(intArray.length * 4);
+
+ for (int value : intArray) {
+ byteBuffer.putInt(value);
+ }
+
+ ((Buffer) byteBuffer).flip();
+
+ return byteBuffer;
+ }
+
+ public static Integer[] toIntArray(ByteBuffer byteBuffer) {
+ Integer[] intArray = new Integer[byteBuffer.capacity() / 4];
+
+ for (int i = 0; i < intArray.length; i++) {
+ intArray[i] = byteBuffer.getInt();
+ }
+
+ return intArray;
+ }
+}
diff --git
a/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/BufferUtilsTest.java
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/BufferUtilsTest.java
new file mode 100644
index 0000000000..36a010ec91
--- /dev/null
+++
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/BufferUtilsTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.common.utils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+
+public class BufferUtilsTest {
+
+ @Test
+ public void testToByteBufferAndToShortArray() {
+ Short[] shortArray = {1, 2, 3, 4, 5};
+ ByteBuffer byteBuffer = BufferUtils.toByteBuffer(shortArray);
+ Short[] resultArray = BufferUtils.toShortArray(byteBuffer);
+
+ Assertions.assertArrayEquals(shortArray, resultArray, "Short array
conversion failed");
+ }
+
+ @Test
+ public void testToByteBufferAndToFloatArray() {
+ Float[] floatArray = {1.1f, 2.2f, 3.3f, 4.4f, 5.5f};
+ ByteBuffer byteBuffer = BufferUtils.toByteBuffer(floatArray);
+ Float[] resultArray = BufferUtils.toFloatArray(byteBuffer);
+
+ Assertions.assertArrayEquals(floatArray, resultArray, "Float array
conversion failed");
+ }
+
+ @Test
+ public void testToByteBufferAndToDoubleArray() {
+ Double[] doubleArray = {1.1, 2.2, 3.3, 4.4, 5.5};
+ ByteBuffer byteBuffer = BufferUtils.toByteBuffer(doubleArray);
+ Double[] resultArray = BufferUtils.toDoubleArray(byteBuffer);
+
+ Assertions.assertArrayEquals(doubleArray, resultArray, "Double array
conversion failed");
+ }
+
+ @Test
+ public void testToByteBufferAndToIntArray() {
+ Integer[] intArray = {1, 2, 3, 4, 5};
+ ByteBuffer byteBuffer = BufferUtils.toByteBuffer(intArray);
+ Integer[] resultArray = BufferUtils.toIntArray(byteBuffer);
+
+ Assertions.assertArrayEquals(intArray, resultArray, "Integer array
conversion failed");
+ }
+
+ @Test
+ public void testEmptyArrayConversion() {
+ // Test empty arrays
+ Short[] shortArray = {};
+ ByteBuffer shortBuffer = BufferUtils.toByteBuffer(shortArray);
+ Short[] shortResultArray = BufferUtils.toShortArray(shortBuffer);
+ Assertions.assertArrayEquals(
+ shortArray, shortResultArray, "Empty Short array conversion
failed");
+
+ Float[] floatArray = {};
+ ByteBuffer floatBuffer = BufferUtils.toByteBuffer(floatArray);
+ Float[] floatResultArray = BufferUtils.toFloatArray(floatBuffer);
+ Assertions.assertArrayEquals(
+ floatArray, floatResultArray, "Empty Float array conversion
failed");
+
+ Double[] doubleArray = {};
+ ByteBuffer doubleBuffer = BufferUtils.toByteBuffer(doubleArray);
+ Double[] doubleResultArray = BufferUtils.toDoubleArray(doubleBuffer);
+ Assertions.assertArrayEquals(
+ doubleArray, doubleResultArray, "Empty Double array conversion
failed");
+
+ Integer[] intArray = {};
+ ByteBuffer intBuffer = BufferUtils.toByteBuffer(intArray);
+ Integer[] intResultArray = BufferUtils.toIntArray(intBuffer);
+ Assertions.assertArrayEquals(
+ intArray, intResultArray, "Empty Integer array conversion
failed");
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java
index 8491a57732..8a8a14dc70 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.fake.utils;
+import org.apache.seatunnel.common.utils.BufferUtils;
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
import org.apache.commons.collections4.CollectionUtils;
@@ -24,7 +25,6 @@ import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import java.math.BigDecimal;
-import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
@@ -178,14 +178,14 @@ public class FakeDataRandomUtils {
return ByteBuffer.wrap(RandomUtils.nextBytes(byteCount));
}
- public Float[] randomFloatVector() {
+ public ByteBuffer randomFloatVector() {
Float[] floatVector = new Float[fakeConfig.getVectorDimension()];
for (int i = 0; i < fakeConfig.getVectorDimension(); i++) {
floatVector[i] =
RandomUtils.nextFloat(
fakeConfig.getVectorFloatMin(),
fakeConfig.getVectorFloatMax());
}
- return floatVector;
+ return BufferUtils.toByteBuffer(floatVector);
}
public ByteBuffer randomFloat16Vector() {
@@ -196,7 +196,7 @@ public class FakeDataRandomUtils {
fakeConfig.getVectorFloatMin(),
fakeConfig.getVectorFloatMax());
float16Vector[i] = floatToFloat16(value);
}
- return shortArrayToByteBuffer(float16Vector);
+ return BufferUtils.toByteBuffer(float16Vector);
}
public ByteBuffer randomBFloat16Vector() {
@@ -207,7 +207,7 @@ public class FakeDataRandomUtils {
fakeConfig.getVectorFloatMin(),
fakeConfig.getVectorFloatMax());
bfloat16Vector[i] = floatToBFloat16(value);
}
- return shortArrayToByteBuffer(bfloat16Vector);
+ return BufferUtils.toByteBuffer(bfloat16Vector);
}
public Map<Integer, Float> randomSparseFloatVector() {
@@ -242,20 +242,6 @@ public class FakeDataRandomUtils {
return (short) (sign | (exponent << 10) | (mantissa >> 13));
}
- private static ByteBuffer shortArrayToByteBuffer(Short[] shortArray) {
- ByteBuffer byteBuffer = ByteBuffer.allocate(shortArray.length * 2);
-
- for (Short value : shortArray) {
- byteBuffer.putShort(value);
- }
-
- // Compatible compilation and running versions are not consistent
- // Flip the buffer to prepare for reading
- ((Buffer) byteBuffer).flip();
-
- return byteBuffer;
- }
-
private static short floatToBFloat16(float value) {
int intBits = Float.floatToIntBits(value);
return (short) (intBits >> 16);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
index 2092a54f98..a498879138 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.utils.BufferUtils;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
@@ -32,6 +33,7 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseI
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;
import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -40,8 +42,6 @@ import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Optional;
public class OceanBaseMysqlJdbcRowConverter extends AbstractJdbcRowConverter {
@@ -89,11 +89,13 @@ public class OceanBaseMysqlJdbcRowConverter extends
AbstractJdbcRowConverter {
fields[fieldIndex] = JdbcFieldTypeUtils.getFloat(rs,
resultSetIndex);
break;
case FLOAT_VECTOR:
- List<Float> vector = new ArrayList<>();
- for (Object o : (Object[]) rs.getObject(fieldIndex)) {
- vector.add(Float.parseFloat(o.toString()));
+ Object[] objects = (Object[]) rs.getObject(fieldIndex);
+ Float[] arrays = new Float[objects.length];
+ for (int i = 0; i < objects.length; i++) {
+ arrays[i] = Float.parseFloat(objects[i].toString());
}
- fields[fieldIndex] = vector;
+ fields[fieldIndex] = BufferUtils.toByteBuffer(arrays);
+ break;
case DOUBLE:
fields[fieldIndex] = JdbcFieldTypeUtils.getDouble(rs,
resultSetIndex);
break;
@@ -172,8 +174,10 @@ public class OceanBaseMysqlJdbcRowConverter extends
AbstractJdbcRowConverter {
statement.setFloat(statementIndex, (Float)
row.getField(fieldIndex));
break;
case FLOAT_VECTOR:
- if (row.getField(fieldIndex) instanceof Float[]) {
- Float[] floatArray = (Float[])
row.getField(fieldIndex);
+ if (row.getField(fieldIndex) instanceof ByteBuffer) {
+ ByteBuffer byteBuffer = (ByteBuffer)
row.getField(fieldIndex);
+ // Convert ByteBuffer to Float[]
+ Float[] floatArray =
BufferUtils.toFloatArray(byteBuffer);
StringBuilder vector = new StringBuilder();
vector.append("[");
for (Float aFloat : floatArray) {
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/convert/MilvusConvertUtils.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/convert/MilvusConvertUtils.java
index 6886707382..0e7a898b23 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/convert/MilvusConvertUtils.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/convert/MilvusConvertUtils.java
@@ -34,6 +34,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.api.table.type.VectorType;
+import org.apache.seatunnel.common.utils.BufferUtils;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.milvus.catalog.MilvusOptions;
import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig;
@@ -320,16 +321,14 @@ public class MilvusConvertUtils {
case DATE:
return value.toString();
case FLOAT_VECTOR:
- List<Float> vector = new ArrayList<>();
- for (Object o : (Object[]) value) {
- vector.add(Float.parseFloat(o.toString()));
- }
- return vector;
+ ByteBuffer floatVectorBuffer = (ByteBuffer) value;
+ Float[] floats = BufferUtils.toFloatArray(floatVectorBuffer);
+ return Arrays.stream(floats).collect(Collectors.toList());
case BINARY_VECTOR:
case BFLOAT16_VECTOR:
case FLOAT16_VECTOR:
- ByteBuffer binaryVector = (ByteBuffer) value;
- return gson.toJsonTree(binaryVector.array());
+ ByteBuffer vector = (ByteBuffer) value;
+ return gson.toJsonTree(vector.array());
case SPARSE_FLOAT_VECTOR:
return
JsonParser.parseString(JacksonUtils.toJsonString(value)).getAsJsonObject();
case FLOAT:
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
index b8b391d0d4..7464c652b3 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.utils.BufferUtils;
import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException;
@@ -220,7 +221,7 @@ public class MilvusSourceReader implements
SourceReader<SeaTunnelRow, MilvusSour
for (int i = 0; i < list.size(); i++) {
arrays[i] =
Float.parseFloat(list.get(i).toString());
}
- fields[fieldIndex] = arrays;
+ fields[fieldIndex] = BufferUtils.toByteBuffer(arrays);
break;
} else {
throw new MilvusConnectorException(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
index cead62af7e..d406250e77 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.e2e.connector.v2.milvus;
+import org.apache.seatunnel.common.utils.BufferUtils;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.EngineType;
@@ -54,7 +55,6 @@ import io.milvus.param.index.CreateIndexParam;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
-import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -227,7 +227,7 @@ public class MilvusIT extends TestSuiteBase implements
TestResource {
List<Float> vector = Arrays.asList((float) i, (float) i, (float)
i, (float) i);
row.add(VECTOR_FIELD, gson.toJsonTree(vector));
Short[] shorts = {(short) i, (short) i, (short) i, (short) i};
- ByteBuffer shortByteBuffer = shortArrayToByteBuffer(shorts);
+ ByteBuffer shortByteBuffer = BufferUtils.toByteBuffer(shorts);
row.add(VECTOR_FIELD2, gson.toJsonTree(shortByteBuffer.array()));
ByteBuffer binaryByteBuffer = ByteBuffer.wrap(new byte[] {16});
row.add(VECTOR_FIELD3, gson.toJsonTree(binaryByteBuffer.array()));
@@ -362,18 +362,4 @@ public class MilvusIT extends TestSuiteBase implements
TestResource {
Assertions.assertTrue(fileds.contains("book_intro_3"));
Assertions.assertTrue(fileds.contains("book_intro_4"));
}
-
- private static ByteBuffer shortArrayToByteBuffer(Short[] shortArray) {
- ByteBuffer byteBuffer = ByteBuffer.allocate(shortArray.length * 2); //
2 bytes per short
-
- for (Short value : shortArray) {
- byteBuffer.putShort(value);
- }
-
- // Compatible compilation and running versions are not consistent
- // Flip the buffer to prepare for reading
- ((Buffer) byteBuffer).flip();
-
- return byteBuffer;
- }
}