This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a6b188d552 [Feature][Connector-V2][OceanBase] Support vector types on 
OceanBase (#7375)
a6b188d552 is described below

commit a6b188d5528ba914c289f65bb910621659390fe3
Author: zhouyh <[email protected]>
AuthorDate: Wed Aug 21 22:24:33 2024 +0800

    [Feature][Connector-V2][OceanBase] Support vector types on OceanBase (#7375)
---
 .../oceanbase/OceanBaseMysqlJdbcRowConverter.java  | 206 ++++++++++
 .../connector-jdbc-e2e-part-2/pom.xml              |  22 +-
 .../seatunnel/jdbc/JdbcOceanBaseMilvusIT.java      | 435 +++++++++++++++++++++
 .../resources/jdbc_fake_to_oceanbase_sink.conf     |  69 ++++
 .../jdbc_milvus_source_and_oceanbase_sink.conf     |  47 +++
 .../container/seatunnel/SeaTunnelContainer.java    |   4 +-
 6 files changed, 780 insertions(+), 3 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
index 2033518108..2092a54f98 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
@@ -17,14 +17,32 @@
 
 package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase;
 
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;
 
+import java.math.BigDecimal;
+import java.sql.Date;
 import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
 
 public class OceanBaseMysqlJdbcRowConverter extends AbstractJdbcRowConverter {
     @Override
@@ -39,4 +57,192 @@ public class OceanBaseMysqlJdbcRowConverter extends 
AbstractJdbcRowConverter {
         statement.setTimestamp(
                 index, 
java.sql.Timestamp.valueOf(LocalDateTime.of(LocalDate.now(), time)));
     }
+
+    @Override
+    public SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema) 
throws SQLException {
+        SeaTunnelRowType typeInfo = tableSchema.toPhysicalRowDataType();
+        Object[] fields = new Object[typeInfo.getTotalFields()];
+        for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields(); 
fieldIndex++) {
+            SeaTunnelDataType<?> seaTunnelDataType = 
typeInfo.getFieldType(fieldIndex);
+            String fieldName = typeInfo.getFieldName(fieldIndex);
+            int resultSetIndex = fieldIndex + 1;
+            switch (seaTunnelDataType.getSqlType()) {
+                case STRING:
+                    fields[fieldIndex] = JdbcFieldTypeUtils.getString(rs, 
resultSetIndex);
+                    break;
+                case BOOLEAN:
+                    fields[fieldIndex] = JdbcFieldTypeUtils.getBoolean(rs, 
resultSetIndex);
+                    break;
+                case TINYINT:
+                    fields[fieldIndex] = JdbcFieldTypeUtils.getByte(rs, 
resultSetIndex);
+                    break;
+                case SMALLINT:
+                    fields[fieldIndex] = JdbcFieldTypeUtils.getShort(rs, 
resultSetIndex);
+                    break;
+                case INT:
+                    fields[fieldIndex] = JdbcFieldTypeUtils.getInt(rs, 
resultSetIndex);
+                    break;
+                case BIGINT:
+                    fields[fieldIndex] = JdbcFieldTypeUtils.getLong(rs, 
resultSetIndex);
+                    break;
+                case FLOAT:
+                    fields[fieldIndex] = JdbcFieldTypeUtils.getFloat(rs, 
resultSetIndex);
+                    break;
+                case FLOAT_VECTOR:
+                    List<Float> vector = new ArrayList<>();
+                    for (Object o : (Object[]) rs.getObject(fieldIndex)) {
+                        vector.add(Float.parseFloat(o.toString()));
+                    }
+                    fields[fieldIndex] = vector;
+                case DOUBLE:
+                    fields[fieldIndex] = JdbcFieldTypeUtils.getDouble(rs, 
resultSetIndex);
+                    break;
+                case DECIMAL:
+                    fields[fieldIndex] = JdbcFieldTypeUtils.getBigDecimal(rs, 
resultSetIndex);
+                    break;
+                case DATE:
+                    Date sqlDate = JdbcFieldTypeUtils.getDate(rs, 
resultSetIndex);
+                    fields[fieldIndex] =
+                            Optional.ofNullable(sqlDate).map(e -> 
e.toLocalDate()).orElse(null);
+                    break;
+                case TIME:
+                    fields[fieldIndex] = readTime(rs, resultSetIndex);
+                    break;
+                case TIMESTAMP:
+                    Timestamp sqlTimestamp = 
JdbcFieldTypeUtils.getTimestamp(rs, resultSetIndex);
+                    fields[fieldIndex] =
+                            Optional.ofNullable(sqlTimestamp)
+                                    .map(e -> e.toLocalDateTime())
+                                    .orElse(null);
+                    break;
+                case BYTES:
+                    fields[fieldIndex] = JdbcFieldTypeUtils.getBytes(rs, 
resultSetIndex);
+                    break;
+                case NULL:
+                    fields[fieldIndex] = null;
+                    break;
+                case ARRAY:
+                    fields[fieldIndex] =
+                            convertToArray(rs, resultSetIndex, 
seaTunnelDataType, fieldName);
+                    break;
+                case MAP:
+                case ROW:
+                default:
+                    throw CommonError.unsupportedDataType(
+                            converterName(), 
seaTunnelDataType.getSqlType().toString(), fieldName);
+            }
+        }
+        return new SeaTunnelRow(fields);
+    }
+
+    @Override
+    public PreparedStatement toExternal(
+            TableSchema tableSchema, SeaTunnelRow row, PreparedStatement 
statement)
+            throws SQLException {
+        SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
+        for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); 
fieldIndex++) {
+            try {
+                SeaTunnelDataType<?> seaTunnelDataType = 
rowType.getFieldType(fieldIndex);
+                int statementIndex = fieldIndex + 1;
+                Object fieldValue = row.getField(fieldIndex);
+                if (fieldValue == null) {
+                    statement.setObject(statementIndex, null);
+                    continue;
+                }
+                switch (seaTunnelDataType.getSqlType()) {
+                    case STRING:
+                        statement.setString(statementIndex, (String) 
row.getField(fieldIndex));
+                        break;
+                    case BOOLEAN:
+                        statement.setBoolean(statementIndex, (Boolean) 
row.getField(fieldIndex));
+                        break;
+                    case TINYINT:
+                        statement.setByte(statementIndex, (Byte) 
row.getField(fieldIndex));
+                        break;
+                    case SMALLINT:
+                        statement.setShort(statementIndex, (Short) 
row.getField(fieldIndex));
+                        break;
+                    case INT:
+                        statement.setInt(statementIndex, (Integer) 
row.getField(fieldIndex));
+                        break;
+                    case BIGINT:
+                        statement.setLong(statementIndex, (Long) 
row.getField(fieldIndex));
+                        break;
+                    case FLOAT:
+                        statement.setFloat(statementIndex, (Float) 
row.getField(fieldIndex));
+                        break;
+                    case FLOAT_VECTOR:
+                        if (row.getField(fieldIndex) instanceof Float[]) {
+                            Float[] floatArray = (Float[]) 
row.getField(fieldIndex);
+                            StringBuilder vector = new StringBuilder();
+                            vector.append("[");
+                            for (Float aFloat : floatArray) {
+                                vector.append(aFloat).append(", ");
+                            }
+                            if (vector.length() > 0) {
+                                vector.setLength(vector.length() - 2);
+                            }
+                            vector.append("]");
+                            statement.setString(statementIndex, 
vector.toString());
+                        }
+                        break;
+                    case DOUBLE:
+                        statement.setDouble(statementIndex, (Double) 
row.getField(fieldIndex));
+                        break;
+                    case DECIMAL:
+                        statement.setBigDecimal(
+                                statementIndex, (BigDecimal) 
row.getField(fieldIndex));
+                        break;
+                    case DATE:
+                        LocalDate localDate = (LocalDate) 
row.getField(fieldIndex);
+                        statement.setDate(statementIndex, 
java.sql.Date.valueOf(localDate));
+                        break;
+                    case TIME:
+                        writeTime(statement, statementIndex, (LocalTime) 
row.getField(fieldIndex));
+                        break;
+                    case TIMESTAMP:
+                        LocalDateTime localDateTime = (LocalDateTime) 
row.getField(fieldIndex);
+                        statement.setTimestamp(
+                                statementIndex, 
java.sql.Timestamp.valueOf(localDateTime));
+                        break;
+                    case BYTES:
+                        statement.setBytes(statementIndex, (byte[]) 
row.getField(fieldIndex));
+                        break;
+                    case NULL:
+                        statement.setNull(statementIndex, java.sql.Types.NULL);
+                        break;
+                    case ARRAY:
+                        SeaTunnelDataType elementType =
+                                ((ArrayType) 
seaTunnelDataType).getElementType();
+                        Object[] array = (Object[]) row.getField(fieldIndex);
+                        if (array == null) {
+                            statement.setNull(statementIndex, 
java.sql.Types.ARRAY);
+                            break;
+                        }
+                        if (SqlType.TINYINT.equals(elementType.getSqlType())) {
+                            Short[] shortArray = new Short[array.length];
+                            for (int i = 0; i < array.length; i++) {
+                                shortArray[i] = 
Short.valueOf(array[i].toString());
+                            }
+                            statement.setObject(statementIndex, shortArray);
+                        } else {
+                            statement.setObject(statementIndex, array);
+                        }
+                        break;
+                    case MAP:
+                    case ROW:
+                    default:
+                        throw new JdbcConnectorException(
+                                
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+                                "Unexpected value: " + seaTunnelDataType);
+                }
+            } catch (Exception e) {
+                throw new JdbcConnectorException(
+                        JdbcConnectorErrorCode.DATA_TYPE_CAST_FAILED,
+                        "error field:" + rowType.getFieldNames()[fieldIndex],
+                        e);
+            }
+        }
+        return statement;
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/pom.xml
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/pom.xml
index d8f631d3e8..d5f689f973 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/pom.xml
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/pom.xml
@@ -24,7 +24,9 @@
 
     <artifactId>connector-jdbc-e2e-part-2</artifactId>
     <name>SeaTunnel : E2E : Connector V2 : Jdbc : Part 2</name>
-
+    <properties>
+        <testcontainer.milvus.version>1.19.8</testcontainer.milvus.version>
+    </properties>
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
@@ -33,7 +35,23 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
-
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>2.8.9</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-milvus</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>milvus</artifactId>
+            <version>${testcontainer.milvus.version}</version>
+        </dependency>
         <!-- drivers -->
         <dependency>
             <groupId>com.aliyun.phoenix</groupId>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMilvusIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMilvusIT.java
new file mode 100644
index 0000000000..36e66ca9d8
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMilvusIT.java
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.PullPolicy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.milvus.MilvusContainer;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import io.milvus.client.MilvusServiceClient;
+import io.milvus.grpc.DataType;
+import io.milvus.grpc.MutationResult;
+import io.milvus.param.ConnectParam;
+import io.milvus.param.IndexType;
+import io.milvus.param.MetricType;
+import io.milvus.param.R;
+import io.milvus.param.RpcStatus;
+import io.milvus.param.collection.CreateCollectionParam;
+import io.milvus.param.collection.FieldType;
+import io.milvus.param.collection.LoadCollectionParam;
+import io.milvus.param.dml.InsertParam;
+import io.milvus.param.index.CreateIndexParam;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+@Slf4j
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK},
+        disabledReason = "Currently SPARK and FLINK not support adapt")
+public class JdbcOceanBaseMilvusIT extends TestSuiteBase implements 
TestResource {
+
+    private static final String IMAGE = "oceanbase/oceanbase-ce:vector";
+
+    private static final String HOSTNAME = "e2e_oceanbase_vector";
+    private static final int PORT = 2881;
+    private static final String USERNAME = "root@test";
+    private static final String PASSWORD = "";
+    private static final String OCEANBASE_DATABASE = "seatunnel";
+    private GenericContainer<?> dbServer;
+    private Connection connection;
+    private JdbcCase jdbcCase;
+    private static final String OCEANBASE_SINK = "simple_example";
+
+    private static final String HOST = "HOST";
+    private static final String OCEANBASE_JDBC_TEMPLATE = "jdbc:oceanbase://" 
+ HOST + ":%s/%s";
+    private static final String OCEANBASE_DRIVER_CLASS = 
"com.oceanbase.jdbc.Driver";
+
+    private static final String MILVUS_HOST = "milvus-e2e";
+    private static final String MILVUS_IMAGE = 
"milvusdb/milvus:2.4-20240711-7e2a9d6b";
+    private static final String TOKEN = "root:Milvus";
+    private MilvusContainer container;
+    private MilvusServiceClient milvusClient;
+    private static final String COLLECTION_NAME = "simple_example";
+    private static final String ID_FIELD = "book_id";
+    private static final String VECTOR_FIELD = "book_intro";
+    private static final String TITLE_FIELD = "book_title";
+    private static final Integer VECTOR_DIM = 4;
+    private static final Gson gson = new Gson();
+
+    @TestContainerExtension
+    private final ContainerExtendedFactory extendedFactory =
+            container -> {
+                Container.ExecResult extraCommands =
+                        container.execInContainer(
+                                "bash",
+                                "-c",
+                                "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && 
cd /tmp/seatunnel/plugins/Jdbc/lib && wget "
+                                        + driverUrl());
+                Assertions.assertEquals(0, extraCommands.getExitCode(), 
extraCommands.getStderr());
+            };
+
+    String driverUrl() {
+        return 
"https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar";;
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        dbServer = initOceanbaseContainer();
+
+        Startables.deepStart(Stream.of(dbServer)).join();
+        jdbcCase = getJdbcCase();
+        given().ignoreExceptions()
+                .await()
+                .atMost(360, TimeUnit.SECONDS)
+                .untilAsserted(() -> 
this.initializeJdbcConnection(jdbcCase.getJdbcUrl()));
+
+        createSchemaIfNeeded();
+        createNeededTables();
+        this.container =
+                new MilvusContainer(MILVUS_IMAGE)
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(MILVUS_HOST);
+        Startables.deepStart(Stream.of(this.container)).join();
+        log.info("Milvus host is {}", container.getHost());
+        log.info("Milvus container started");
+        Awaitility.given().ignoreExceptions().await().atMost(720L, 
TimeUnit.SECONDS);
+        this.initMilvus();
+        this.initSourceData();
+    }
+
+    private void initMilvus()
+            throws SQLException, ClassNotFoundException, 
InstantiationException,
+                    IllegalAccessException {
+        milvusClient =
+                new MilvusServiceClient(
+                        ConnectParam.newBuilder()
+                                .withUri(this.container.getEndpoint())
+                                .withToken(TOKEN)
+                                .build());
+    }
+
+    private void initSourceData() {
+        // Define fields
+        List<FieldType> fieldsSchema =
+                Arrays.asList(
+                        FieldType.newBuilder()
+                                .withName(ID_FIELD)
+                                .withDataType(DataType.Int64)
+                                .withPrimaryKey(true)
+                                .withAutoID(false)
+                                .build(),
+                        FieldType.newBuilder()
+                                .withName(VECTOR_FIELD)
+                                .withDataType(DataType.FloatVector)
+                                .withDimension(VECTOR_DIM)
+                                .build(),
+                        FieldType.newBuilder()
+                                .withName(TITLE_FIELD)
+                                .withDataType(DataType.VarChar)
+                                .withMaxLength(64)
+                                .build());
+
+        // Create the collection with 3 fields
+        R<RpcStatus> ret =
+                milvusClient.createCollection(
+                        CreateCollectionParam.newBuilder()
+                                .withCollectionName(COLLECTION_NAME)
+                                .withFieldTypes(fieldsSchema)
+                                .build());
+        if (ret.getStatus() != R.Status.Success.getCode()) {
+            throw new RuntimeException("Failed to create collection! Error: " 
+ ret.getMessage());
+        }
+
+        // Specify an index type on the vector field.
+        ret =
+                milvusClient.createIndex(
+                        CreateIndexParam.newBuilder()
+                                .withCollectionName(COLLECTION_NAME)
+                                .withFieldName(VECTOR_FIELD)
+                                .withIndexType(IndexType.FLAT)
+                                .withMetricType(MetricType.L2)
+                                .build());
+        if (ret.getStatus() != R.Status.Success.getCode()) {
+            throw new RuntimeException(
+                    "Failed to create index on vector field! Error: " + 
ret.getMessage());
+        }
+
+        // Call loadCollection() to enable automatically loading data into 
memory for searching
+        milvusClient.loadCollection(
+                
LoadCollectionParam.newBuilder().withCollectionName(COLLECTION_NAME).build());
+
+        log.info("Collection created");
+
+        // Insert 10 records into the collection
+        List<JsonObject> rows = new ArrayList<>();
+        for (long i = 1L; i <= 10; ++i) {
+
+            JsonObject row = new JsonObject();
+            row.add(ID_FIELD, gson.toJsonTree(i));
+            List<Float> vector = Arrays.asList((float) i, (float) i, (float) 
i, (float) i);
+            row.add(VECTOR_FIELD, gson.toJsonTree(vector));
+            row.addProperty(TITLE_FIELD, "Tom and Jerry " + i);
+            rows.add(row);
+        }
+
+        R<MutationResult> insertRet =
+                milvusClient.insert(
+                        InsertParam.newBuilder()
+                                .withCollectionName(COLLECTION_NAME)
+                                .withRows(rows)
+                                .build());
+        if (insertRet.getStatus() != R.Status.Success.getCode()) {
+            throw new RuntimeException("Failed to insert! Error: " + 
insertRet.getMessage());
+        }
+        log.info("Milvus test data created");
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        if (milvusClient != null) {
+            milvusClient.close();
+        }
+        if (dbServer != null) {
+            dbServer.close();
+        }
+        if (container != null) {
+            container.close();
+        }
+    }
+
+    @TestTemplate
+    public void testMilvusToOceanBase(TestContainer container) throws 
Exception {
+        try {
+            Container.ExecResult execResult = 
container.executeJob(configFile().get(0));
+            Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+        } finally {
+            clearTable(jdbcCase.getDatabase(), jdbcCase.getSchema(), 
jdbcCase.getSinkTable());
+        }
+    }
+
+    @TestTemplate
+    public void testFakeToOceanBase(TestContainer container)
+            throws IOException, InterruptedException {
+        try {
+            Container.ExecResult execResult = 
container.executeJob(configFile().get(1));
+            Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+        } finally {
+            clearTable(jdbcCase.getDatabase(), jdbcCase.getSchema(), 
jdbcCase.getSinkTable());
+        }
+    }
+
+    private void clearTable(String database, String schema, String table) {
+        clearTable(database, table);
+    }
+
+    public void clearTable(String schema, String table) {
+        try (Statement statement = connection.createStatement()) {
+            statement.execute("TRUNCATE TABLE " + 
buildTableInfoWithSchema(schema, table));
+            connection.commit();
+        } catch (SQLException e) {
+            try {
+                connection.rollback();
+            } catch (SQLException exception) {
+                throw new 
SeaTunnelRuntimeException(JdbcITErrorCode.CLEAR_TABLE_FAILED, exception);
+            }
+            throw new 
SeaTunnelRuntimeException(JdbcITErrorCode.CLEAR_TABLE_FAILED, e);
+        }
+    }
+
+    JdbcCase getJdbcCase() {
+        Map<String, String> containerEnv = new HashMap<>();
+        String jdbcUrl =
+                String.format(OCEANBASE_JDBC_TEMPLATE, 
dbServer.getMappedPort(PORT), "test");
+
+        return JdbcCase.builder()
+                .dockerImage(IMAGE)
+                .networkAliases(HOSTNAME)
+                .containerEnv(containerEnv)
+                .driverClass(OCEANBASE_DRIVER_CLASS)
+                .host(HOST)
+                .port(PORT)
+                .localPort(dbServer.getMappedPort(PORT))
+                .jdbcTemplate(OCEANBASE_JDBC_TEMPLATE)
+                .jdbcUrl(jdbcUrl)
+                .userName(USERNAME)
+                .password(PASSWORD)
+                .database(OCEANBASE_DATABASE)
+                .sinkTable(OCEANBASE_SINK)
+                .createSql(createSqlTemplate())
+                .build();
+    }
+
+    List<String> configFile() {
+        return Lists.newArrayList(
+                "/jdbc_milvus_source_and_oceanbase_sink.conf", 
"/jdbc_fake_to_oceanbase_sink.conf");
+    }
+
+    private void initializeJdbcConnection(String jdbcUrl)
+            throws SQLException, InstantiationException, 
IllegalAccessException {
+        Driver driver = (Driver) loadDriverClass().newInstance();
+        Properties props = new Properties();
+
+        if (StringUtils.isNotBlank(jdbcCase.getUserName())) {
+            props.put("user", jdbcCase.getUserName());
+        }
+
+        if (StringUtils.isNotBlank(jdbcCase.getPassword())) {
+            props.put("password", jdbcCase.getPassword());
+        }
+
+        if (dbServer != null) {
+            jdbcUrl = jdbcUrl.replace(HOST, dbServer.getHost());
+        }
+
+        this.connection = driver.connect(jdbcUrl, props);
+        connection.setAutoCommit(false);
+    }
+
+    private Class<?> loadDriverClass() {
+        try {
+            return Class.forName(jdbcCase.getDriverClass());
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Failed to load driver class: " + 
jdbcCase.getDriverClass(), e);
+        }
+    }
+
+    private void createSchemaIfNeeded() {
+        String sql = "CREATE DATABASE IF NOT EXISTS " + OCEANBASE_DATABASE;
+        try {
+            connection.prepareStatement(sql).executeUpdate();
+        } catch (Exception e) {
+            throw new SeaTunnelRuntimeException(
+                    JdbcITErrorCode.CREATE_TABLE_FAILED, "Fail to execute sql 
" + sql, e);
+        }
+        log.info("oceanbase schema created,sql is" + sql);
+    }
+
+    String createSqlTemplate() {
+        return "CREATE TABLE IF NOT EXISTS %s\n"
+                + "(\n"
+                + "book_id varchar(20) NOT NULL,\n"
+                + "book_intro vector(4) DEFAULT NULL,\n"
+                + "book_title varchar(64) DEFAULT NULL,\n"
+                + "primary key (book_id)\n"
+                + ");";
+    }
+
+    GenericContainer<?> initOceanbaseContainer() {
+        return new GenericContainer<>(IMAGE)
+                .withEnv("MODE", "slim")
+                .withEnv("OB_DATAFILE_SIZE", "2G")
+                .withNetwork(NETWORK)
+                .withNetworkAliases(HOSTNAME)
+                .withExposedPorts(PORT)
+                .withImagePullPolicy(PullPolicy.alwaysPull())
+                .waitingFor(Wait.forLogMessage(".*boot success!.*", 1))
+                .withStartupTimeout(Duration.ofMinutes(5))
+                .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)));
+    }
+
+    private void createNeededTables() {
+        try (Statement statement = connection.createStatement()) {
+            String createTemplate = jdbcCase.getCreateSql();
+
+            if (!jdbcCase.isUseSaveModeCreateTable()) {
+                if (jdbcCase.getSinkCreateSql() != null) {
+                    createTemplate = jdbcCase.getSinkCreateSql();
+                }
+                String createSink =
+                        String.format(
+                                createTemplate,
+                                buildTableInfoWithSchema(
+                                        jdbcCase.getDatabase(),
+                                        jdbcCase.getSchema(),
+                                        jdbcCase.getSinkTable()));
+                statement.execute(createSink);
+                log.info("oceanbase table created,sql is" + createSink);
+            }
+
+            connection.commit();
+        } catch (Exception exception) {
+            log.error(ExceptionUtils.getMessage(exception));
+            throw new 
SeaTunnelRuntimeException(JdbcITErrorCode.CREATE_TABLE_FAILED, exception);
+        }
+        log.info("oceanbase table created success!");
+    }
+
+    private String buildTableInfoWithSchema(String database, String schema, 
String table) {
+        return buildTableInfoWithSchema(database, table);
+    }
+
+    public String quoteIdentifier(String field) {
+        return "`" + field + "`";
+    }
+
+    public String buildTableInfoWithSchema(String schema, String table) {
+        if (StringUtils.isNotBlank(schema)) {
+            return quoteIdentifier(schema) + "." + quoteIdentifier(table);
+        } else {
+            return quoteIdentifier(table);
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_fake_to_oceanbase_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_fake_to_oceanbase_sink.conf
new file mode 100644
index 0000000000..4a5ae17cd0
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_fake_to_oceanbase_sink.conf
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+      row.num = 10
+      vector.dimension= 4
+      schema = {
+           table = "simple_example_1"
+           columns = [
+           {
+              name = book_id
+              type = bigint
+              nullable = false
+              defaultValue = 0
+              comment = "primary key id"
+           },
+           {
+              name = book_intro
+              type = float_vector
+              columnScale =4
+              comment = "vector"
+           },
+           {
+              name = book_title
+              type = string
+              nullable = true
+              comment = "topic"
+           }
+       ]
+        primaryKey {
+            name = book_id
+            columnNames = [book_id]
+        }
+      }
+  }
+}
+
+sink {
+  jdbc {
+    url = "jdbc:oceanbase://e2e_oceanbase_vector:2881/seatunnel"
+    driver = "com.oceanbase.jdbc.Driver"
+    user = "root@test"
+    password = ""
+    generate_sink_sql =true
+    compatible_mode="mysql"
+    database = "seatunnel"
+    table = "simple_example"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_milvus_source_and_oceanbase_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_milvus_source_and_oceanbase_sink.conf
new file mode 100644
index 0000000000..9d7ad806f4
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_milvus_source_and_oceanbase_sink.conf
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  Milvus {
+    url = "http://milvus-e2e:19530";
+    token = "root:Milvus"
+    database = "default"
+    collection="simple_example"
+  }
+}
+
+transform {
+}
+
+sink {
+  jdbc {
+    url = "jdbc:oceanbase://e2e_oceanbase_vector:2881/seatunnel"
+    driver = "com.oceanbase.jdbc.Driver"
+    user = "root@test"
+    password = ""
+    generate_sink_sql =true
+    compatible_mode="mysql"
+    database = "seatunnel"
+    table = "simple_example"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 802b1c32fb..7c2d6dda70 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -433,7 +433,9 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
                 || threadName.contains("Timer for 's3a-file-system' metrics 
system")
                 || threadName.startsWith("MutableQuantiles-")
                 // JDBC Hana driver
-                || threadName.startsWith("Thread-");
+                || threadName.startsWith("Thread-")
+                // JNA Cleaner
+                || threadName.startsWith("JNA Cleaner");
     }
 
     @Override


Reply via email to