This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 528c901  [feature]Add kafka to doris container test case (#33)
528c901 is described below

commit 528c901202067e0b851a626917906337d61d2270
Author: wudongliang <46414265+donglian...@users.noreply.github.com>
AuthorDate: Mon Jul 1 16:22:47 2024 +0800

    [feature]Add kafka to doris container test case (#33)
---
 .../workflows/kafka2doris-e2ecase.yaml             |  33 ++-
 .licenserc.yaml                                    |   1 +
 pom.xml                                            |  24 +++
 .../connector/e2e/doris/DorisContainerService.java |  29 +++
 .../e2e/doris/DorisContainerServiceImpl.java       | 188 ++++++++++++++++
 .../connector/e2e/kafka/KafkaContainerService.java |  37 ++++
 .../e2e/kafka/KafkaContainerServiceImpl.java       | 239 +++++++++++++++++++++
 .../e2e/sink/AbstractKafka2DorisSink.java          | 125 +++++++++++
 .../stringconverter/AbstractStringE2ESinkTest.java |  62 ++++++
 .../e2e/sink/stringconverter/StringMsgE2ETest.java |  95 ++++++++
 .../e2e/string_converter/string_msg_connector.json |  21 ++
 .../e2e/string_converter/string_msg_tab.sql        |  12 ++
 12 files changed, 857 insertions(+), 9 deletions(-)

diff --git a/.licenserc.yaml b/.github/workflows/kafka2doris-e2ecase.yaml
similarity index 61%
copy from .licenserc.yaml
copy to .github/workflows/kafka2doris-e2ecase.yaml
index ca50638..f30f5c1 100644
--- a/.licenserc.yaml
+++ b/.github/workflows/kafka2doris-e2ecase.yaml
@@ -15,15 +15,30 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+#
+---
+name: E2E Test CI
+on:
+  pull_request:
+  push:
 
-header:
-  license:
-    spdx-id: Apache-2.0
-    copyright-owner: Apache Software Foundation
+jobs:
+  build-extension:
+    name: "e2e test case"
+    runs-on: ubuntu-latest
+    defaults:
+      run:
+        shell: bash
+    steps:
+      - name: Checkout
+        uses: actions/checkout@master
 
-  paths-ignore:
-    - 'LICENSE'
-    - '.gitignore'
-    - 'src/test/resources/decode/avro/**'
+      - name: Setup java
+        uses: actions/setup-java@v2
+        with:
+          distribution: adopt
+          java-version: '8'
 
-  comment: on-failure
\ No newline at end of file
+      - name: Doris E2E Test
+        run: |
+          mvn test -Dtest='org.apache.doris.kafka.connector.e2e.**'
\ No newline at end of file
diff --git a/.licenserc.yaml b/.licenserc.yaml
index ca50638..07a811b 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -25,5 +25,6 @@ header:
     - 'LICENSE'
     - '.gitignore'
     - 'src/test/resources/decode/avro/**'
+    - 'src/test/resources/e2e/**'
 
   comment: on-failure
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 29005cb..fd99b7d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -91,6 +91,7 @@
         <httpcomponents.version>4.5.13</httpcomponents.version>
         <commons-io.version>2.3</commons-io.version>
         <geometry.version>2.2.0</geometry.version>
+        <testcontainers.version>1.17.6</testcontainers.version>
     </properties>
 
     <repositories>
@@ -294,6 +295,18 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>kafka</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -422,6 +435,17 @@
                     </execution>
                 </executions>
             </plugin>
+            <!-- When using mvn to build a project, ignore packages under 
specific paths-->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.22.2</version>
+                <configuration>
+                    <excludes>
+                        
<exclude>**/org/apache/doris/kafka/connector/e2e/**</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
         </plugins>
     </build>
     <profiles>
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerService.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerService.java
new file mode 100644
index 0000000..79ec94d
--- /dev/null
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerService.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.e2e.doris;
+
+public interface DorisContainerService {
+
+    void startContainer();
+
+    String getInstanceHost();
+
+    void close();
+}
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerServiceImpl.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerServiceImpl.java
new file mode 100644
index 0000000..c328044
--- /dev/null
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerServiceImpl.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.e2e.doris;
+
+import com.google.common.collect.Lists;
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.LockSupport;
+import org.apache.doris.kafka.connector.exception.DorisException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+public class DorisContainerServiceImpl implements DorisContainerService {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(DorisContainerServiceImpl.class);
+    protected static final String DORIS_DOCKER_IMAGE = 
"apache/doris:doris-all-in-one-2.1.0";
+    private static final String DRIVER_JAR =
+            
"https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";;
+    protected static final String JDBC_URL = "jdbc:mysql://%s:9030";
+    protected static final String USERNAME = "root";
+    protected static final String PASSWORD = "";
+    private final GenericContainer dorisContainer;
+
+    public DorisContainerServiceImpl() {
+        dorisContainer = createDorisContainer();
+    }
+
+    public GenericContainer createDorisContainer() {
+        LOG.info("Will create doris containers.");
+        GenericContainer container =
+                new GenericContainer<>(DORIS_DOCKER_IMAGE)
+                        .withNetwork(Network.newNetwork())
+                        .withNetworkAliases("DorisContainer")
+                        .withPrivilegedMode(true)
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        
DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE)))
+                        .withExposedPorts(8030, 9030, 8040, 9060);
+
+        container.setPortBindings(
+                Lists.newArrayList(
+                        String.format("%s:%s", "8030", "8030"),
+                        String.format("%s:%s", "9030", "9030"),
+                        String.format("%s:%s", "9060", "9060"),
+                        String.format("%s:%s", "8040", "8040")));
+        return container;
+    }
+
+    public void startContainer() {
+        try {
+            LOG.info("Starting doris containers.");
+            // singleton doris container
+            dorisContainer.start();
+            initializeJdbcConnection();
+            printClusterStatus();
+        } catch (Exception ex) {
+            LOG.error("Failed to start containers doris", ex);
+            throw new DorisException("Failed to start containers doris", ex);
+        }
+        LOG.info("Doris container started successfully.");
+    }
+
+    @Override
+    public String getInstanceHost() {
+        return dorisContainer.getHost();
+    }
+
+    public void close() {
+        LOG.info("Doris container is about to be close.");
+        dorisContainer.close();
+        LOG.info("Doris container closed successfully.");
+    }
+
+    private void initializeJdbcConnection() throws Exception {
+        URLClassLoader urlClassLoader =
+                new URLClassLoader(
+                        new URL[] {new URL(DRIVER_JAR)},
+                        DorisContainerServiceImpl.class.getClassLoader());
+        LOG.info("Try to connect to Doris.");
+        Thread.currentThread().setContextClassLoader(urlClassLoader);
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                String.format(JDBC_URL, 
dorisContainer.getHost()),
+                                USERNAME,
+                                PASSWORD);
+                Statement statement = connection.createStatement()) {
+            ResultSet resultSet;
+            do {
+                LOG.info("Waiting for the Backend to start successfully.");
+                resultSet = statement.executeQuery("show backends");
+            } while (!isBeReady(resultSet, Duration.ofSeconds(1L)));
+        }
+        LOG.info("Connected to Doris successfully.");
+    }
+
+    private boolean isBeReady(ResultSet rs, Duration duration) throws 
SQLException {
+        LockSupport.parkNanos(duration.toNanos());
+        if (rs.next()) {
+            String isAlive = rs.getString("Alive").trim();
+            String totalCap = rs.getString("TotalCapacity").trim();
+            return "true".equalsIgnoreCase(isAlive) && 
!"0.000".equalsIgnoreCase(totalCap);
+        }
+        return false;
+    }
+
+    private void printClusterStatus() throws Exception {
+        LOG.info("Current machine IP: {}", dorisContainer.getHost());
+        echo("sh", "-c", "cat /proc/cpuinfo | grep 'cpu cores' | uniq");
+        echo("sh", "-c", "free -h");
+        try (Connection connection =
+                        DriverManager.getConnection(
+                                String.format(JDBC_URL, 
dorisContainer.getHost()),
+                                USERNAME,
+                                PASSWORD);
+                Statement statement = connection.createStatement()) {
+            ResultSet showFrontends = statement.executeQuery("show frontends");
+            LOG.info("Frontends status: {}", convertList(showFrontends));
+            ResultSet showBackends = statement.executeQuery("show backends");
+            LOG.info("Backends status: {}", convertList(showBackends));
+        }
+    }
+
+    private void echo(String... cmd) {
+        try {
+            Process p = Runtime.getRuntime().exec(cmd);
+            InputStream is = p.getInputStream();
+            BufferedReader reader = new BufferedReader(new 
InputStreamReader(is));
+            String line;
+            while ((line = reader.readLine()) != null) {
+                System.out.println(line);
+            }
+            p.waitFor();
+            is.close();
+            reader.close();
+            p.destroy();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private List<Map> convertList(ResultSet rs) throws SQLException {
+        List<Map> list = new ArrayList<>();
+        ResultSetMetaData metaData = rs.getMetaData();
+        int columnCount = metaData.getColumnCount();
+        while (rs.next()) {
+            Map<String, Object> rowData = new HashMap<>();
+            for (int i = 1; i <= columnCount; i++) {
+                rowData.put(metaData.getColumnName(i), rs.getObject(i));
+            }
+            list.add(rowData);
+        }
+        return list;
+    }
+}
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java
new file mode 100644
index 0000000..45616c4
--- /dev/null
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.e2e.kafka;
+
+import java.io.IOException;
+
+public interface KafkaContainerService {
+
+    void startContainer();
+
+    void startConnector();
+
+    String getInstanceHostAndPort();
+
+    void registerKafkaConnector(String name, String msg) throws IOException, 
InterruptedException;
+
+    void deleteKafkaConnector(String name);
+
+    void close();
+}
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java
new file mode 100644
index 0000000..083cdb2
--- /dev/null
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.e2e.kafka;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.doris.kafka.connector.exception.DorisException;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.kafka.connect.cli.ConnectDistributed;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+public class KafkaContainerServiceImpl implements KafkaContainerService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaContainerServiceImpl.class);
+    private static final String KAFKA_IMAGE = "confluentinc/cp-kafka:7.6.1";
+    private static final String CONNECT_PROPERTIES_PATH =
+            Objects.requireNonNull(
+                            KafkaContainerServiceImpl.class
+                                    .getClassLoader()
+                                    
.getResource("connect-distributed.properties"))
+                    .getPath();
+    private static final String NEW_CONNECT_PROPERTIES =
+            "src/test/resources/new-connect-distributed.properties";
+    private KafkaContainer kafkaContainer;
+    private final CloseableHttpClient httpClient = HttpClients.createDefault();
+    private String kafkaServerHost;
+    private int kafkaServerPort;
+    private static final String CONNECT_PORT = "8083";
+    private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
+    private final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+    private static final int MAX_RETRIES = 5;
+
+    @Override
+    public String getInstanceHostAndPort() {
+        return kafkaServerHost + ":" + kafkaServerPort;
+    }
+
+    public void startConnector() {
+        LOG.info("Doris-kafka-connect will be starting.");
+        try {
+            String[] params = new String[1];
+            params[0] = getConnectPropertiesPath();
+            // Start ConnectDistributed and run it in a separate thread to 
prevent blocking.
+            executorService.submit(() -> ConnectDistributed.main(params));
+            LOG.info("kafka-connect has been submitted to start.");
+            Thread.sleep(10000);
+        } catch (Exception e) {
+            LOG.error("Failed to start doris-kafka-connect.", e);
+        }
+        waitForKafkaConnect();
+    }
+
+    public void waitForKafkaConnect() {
+        String kafkaConnectUrl = "http://"; + kafkaServerHost + ":" + 
CONNECT_PORT + "/connectors";
+        int responseCode = -1;
+        int attempts = 0;
+        while (attempts < MAX_RETRIES) {
+            try {
+                URL url = new URL(kafkaConnectUrl);
+                HttpURLConnection connection = (HttpURLConnection) 
url.openConnection();
+                connection.setRequestMethod("GET");
+                connection.setConnectTimeout(5000);
+                connection.setReadTimeout(5000);
+
+                responseCode = connection.getResponseCode();
+                if (responseCode == 200) {
+                    LOG.info("doris-kafka-connect is up and running on " + 
kafkaConnectUrl);
+                    return;
+                }
+                LOG.info(
+                        "Received response code "
+                                + responseCode
+                                + ". Waiting for doris-kafka-connect to be 
ready.");
+            } catch (IOException e) {
+                LOG.info("Failed to connect to " + kafkaConnectUrl + ". 
Retrying...");
+            }
+
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                LOG.error("Thread interrupted while waiting for Kafka Connect 
to start.", e);
+            }
+            attempts++;
+        }
+        LOG.error("doris-kafka-connect did not start within " + MAX_RETRIES + 
" attempts.");
+    }
+
+    /**
+     * After the container containing the kafka server is started, the 
externally mapped port is not
+     * 9092. To keep the real kafka 9092 mapped port consistent with the 
connect registered port.
+     */
+    private String getConnectPropertiesPath() {
+        Properties properties = new Properties();
+        try (InputStream fis = 
Files.newInputStream(Paths.get(CONNECT_PROPERTIES_PATH))) {
+            properties.load(fis);
+        } catch (IOException e) {
+            throw new DorisException(
+                    "Failed to read " + CONNECT_PROPERTIES_PATH + "properties 
file.", e);
+        }
+        String bootstrapServers = kafkaServerHost + ":" + kafkaServerPort;
+        properties.put(BOOTSTRAP_SERVERS, bootstrapServers);
+        LOG.info("The bootstrap.servers set to {}", bootstrapServers);
+
+        try (OutputStream fos = 
Files.newOutputStream(Paths.get(NEW_CONNECT_PROPERTIES))) {
+            properties.store(fos, "Updated Kafka Connect Properties.");
+        } catch (IOException e) {
+            throw new DorisException("Failed to write properties file", e);
+        }
+        return NEW_CONNECT_PROPERTIES;
+    }
+
+    @Override
+    public void startContainer() {
+        LOG.info("kafka server is about to be initialized.");
+        kafkaContainer = new 
KafkaContainer(DockerImageName.parse(KAFKA_IMAGE));
+
+        kafkaContainer.start();
+        try {
+            Thread.sleep(5000);
+        } catch (InterruptedException e) {
+            throw new DorisException(e);
+        }
+        kafkaServerHost = kafkaContainer.getHost();
+        kafkaServerPort = kafkaContainer.getMappedPort(9093);
+        LOG.info(
+                "kafka server started successfully. instance={}",
+                kafkaContainer.getBootstrapServers());
+        LOG.info(
+                "kafka server started successfully. instanceHost={}, 
instancePort={}",
+                kafkaServerHost,
+                kafkaServerPort);
+    }
+
+    @Override
+    public void close() {
+        LOG.info("Kafka server is about to be shut down.");
+        shutdownConnector();
+        kafkaContainer.close();
+        LOG.info("Kafka server shuts down successfully.");
+    }
+
+    private void shutdownConnector() {
+        try {
+            LOG.info("Shutting down ExecutorService.");
+            executorService.shutdown();
+            if (!executorService.awaitTermination(60, 
java.util.concurrent.TimeUnit.SECONDS)) {
+                executorService.shutdownNow();
+                if (!executorService.awaitTermination(60, 
java.util.concurrent.TimeUnit.SECONDS)) {
+                    LOG.error("ExecutorService did not terminate.");
+                }
+            }
+        } catch (InterruptedException ie) {
+            executorService.shutdownNow();
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    @Override
+    public void registerKafkaConnector(String name, String msg)
+            throws IOException, InterruptedException {
+        LOG.info("{} Kafka connector will be registering, bodyMsg={}", name, 
msg);
+        String connectUrl = "http://"; + kafkaServerHost + ":" + CONNECT_PORT + 
"/connectors";
+        HttpPost httpPost = new HttpPost(connectUrl);
+        StringEntity entity = new StringEntity(msg);
+        httpPost.setEntity(entity);
+        httpPost.setHeader("Content-type", "application/json");
+        try (CloseableHttpResponse response = httpClient.execute(httpPost)) {
+            StatusLine statusLine = response.getStatusLine();
+            if (statusLine.getStatusCode() != 201) {
+                LOG.warn(
+                        "Failed to register {} kafka connect, msg={}",
+                        name,
+                        statusLine.getReasonPhrase());
+            }
+        } catch (IOException e) {
+            LOG.warn("Failed to delete kafka connect, name={}", name);
+        }
+        LOG.info("{} Kafka connector registered successfully.", name);
+
+        // The current thread sleeps for 10 seconds so that connect can 
consume messages to doris in
+        // time.
+        Thread.sleep(10000);
+    }
+
+    @Override
+    public void deleteKafkaConnector(String name) {
+        LOG.info("{} Kafka connector will be deleting.", name);
+        String connectUrl = "http://"; + kafkaServerHost + ":" + CONNECT_PORT + 
"/connectors/";
+        String deleteUrl = connectUrl + name;
+        HttpDelete httpDelete = new HttpDelete(deleteUrl);
+        try (CloseableHttpResponse response = httpClient.execute(httpDelete)) {
+            StatusLine statusLine = response.getStatusLine();
+            if (statusLine.getStatusCode() != 204) {
+                LOG.warn(
+                        "Failed to delete {} kafka connect, msg={}",
+                        name,
+                        statusLine.getReasonPhrase());
+            }
+        } catch (IOException e) {
+            LOG.warn("Failed to delete kafka connect, name={}", name);
+        }
+        LOG.info("{} Kafka connector deleted successfully.", name);
+    }
+}
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
new file mode 100644
index 0000000..c4c7fe4
--- /dev/null
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.e2e.sink;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.doris.kafka.connector.e2e.doris.DorisContainerService;
+import org.apache.doris.kafka.connector.e2e.doris.DorisContainerServiceImpl;
+import org.apache.doris.kafka.connector.e2e.kafka.KafkaContainerService;
+import org.apache.doris.kafka.connector.e2e.kafka.KafkaContainerServiceImpl;
+import org.apache.doris.kafka.connector.exception.DorisException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractKafka2DorisSink {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractKafka2DorisSink.class);
+    protected static final String NAME = "name";
+    protected static final String CONFIG = "config";
+    private static final String JDBC_URL = "jdbc:mysql://%s:9030";
+    private static final String USERNAME = "root";
+    private static final String PASSWORD = "";
+    private static String dorisInstanceHost;
+    protected static String kafkaInstanceHostAndPort;
+    protected static KafkaContainerService kafkaContainerService;
+    protected static ObjectMapper objectMapper = new ObjectMapper();
+    private static DorisContainerService dorisContainerService;
+
+    @BeforeClass
+    public static void initServer() {
+        initDorisBase();
+        initKafka();
+    }
+
+    private static void initKafka() {
+        if (Objects.nonNull(kafkaContainerService)) {
+            return;
+        }
+        kafkaContainerService = new KafkaContainerServiceImpl();
+        kafkaContainerService.startContainer();
+        kafkaContainerService.startConnector();
+        kafkaInstanceHostAndPort = 
kafkaContainerService.getInstanceHostAndPort();
+    }
+
+    protected static Connection getJdbcConnection() throws SQLException {
+        return DriverManager.getConnection(
+                String.format(JDBC_URL, dorisInstanceHost), USERNAME, 
PASSWORD);
+    }
+
+    protected static String loadContent(String path) {
+        try (InputStream stream = Files.newInputStream(Paths.get(path))) {
+            return new BufferedReader(new 
InputStreamReader(Objects.requireNonNull(stream)))
+                    .lines()
+                    .collect(Collectors.joining("\n"));
+        } catch (IOException e) {
+            throw new DorisException("Failed to read " + path + " file.", e);
+        }
+    }
+
+    protected static void createDatabase(String databaseName) {
+        LOG.info("Will to be create database, sql={}", databaseName);
+        try {
+            Statement statement = getJdbcConnection().createStatement();
+            statement.execute("create database if not exists " + databaseName);
+        } catch (SQLException e) {
+            throw new DorisException("Failed to create doris table.", e);
+        }
+        LOG.info("Create database successfully. databaseName={}", 
databaseName);
+    }
+
+    protected void createTable(String sql) {
+        LOG.info("Will to be create doris table, sql={}", sql);
+        try {
+            Statement statement = getJdbcConnection().createStatement();
+            statement.execute(sql);
+        } catch (SQLException e) {
+            throw new DorisException("Failed to create doris table.", e);
+        }
+        LOG.info("Create doris table successfully. sql={}", sql);
+    }
+
+    private static void initDorisBase() {
+        if (Objects.nonNull(dorisContainerService)) {
+            return;
+        }
+        dorisContainerService = new DorisContainerServiceImpl();
+        dorisContainerService.startContainer();
+        dorisInstanceHost = dorisContainerService.getInstanceHost();
+    }
+
+    @AfterClass
+    public static void close() {
+        kafkaContainerService.close();
+        dorisContainerService.close();
+    }
+}
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/AbstractStringE2ESinkTest.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/AbstractStringE2ESinkTest.java
new file mode 100644
index 0000000..acf9ac0
--- /dev/null
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/AbstractStringE2ESinkTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.e2e.sink.stringconverter;
+
+import java.util.Objects;
+import java.util.Properties;
+import org.apache.doris.kafka.connector.e2e.sink.AbstractKafka2DorisSink;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Produce messages using the {@link 
org.apache.kafka.common.serialization.StringSerializer}
+ * serializer, Use the {@link 
org.apache.kafka.connect.storage.StringConverter} converter to consume
+ * messages.
+ */
+public abstract class AbstractStringE2ESinkTest extends 
AbstractKafka2DorisSink {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractStringE2ESinkTest.class);
+    private static KafkaProducer<String, String> producer;
+
+    public static void initProducer() {
+        if (Objects.nonNull(producer)) {
+            return;
+        }
+        // Producer properties
+        Properties producerProperties = new Properties();
+        producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaInstanceHostAndPort);
+        producerProperties.put(
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+        producerProperties.put(
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+        producer = new KafkaProducer<>(producerProperties);
+        LOG.info("kafka producer started successfully.");
+    }
+
+    protected void produceMsg2Kafka(String topic, String value) {
+        LOG.info("Kafka producer will produce msg. topic={}, msg={}", topic, 
value);
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, 
value);
+        producer.send(record);
+        LOG.info("Kafka producer produced msg successfully. topic={}, msg={}", 
topic, value);
+    }
+}
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
new file mode 100644
index 0000000..9ab8891
--- /dev/null
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.e2e.sink.stringconverter;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
+import org.apache.doris.kafka.connector.exception.DorisException;
+import org.apache.doris.kafka.connector.utils.ConfigCheckUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class StringMsgE2ETest extends AbstractStringE2ESinkTest {
+    private static String connectorName;
+    private static String jsonMsgConnectorContent;
+    private static DorisOptions dorisOptions;
+    private static String database;
+
+    @BeforeClass
+    public static void setUp() {
+        initServer();
+        initProducer();
+        initialize();
+    }
+
+    public static void initialize() {
+        jsonMsgConnectorContent =
+                
loadContent("src/test/resources/e2e/string_converter/string_msg_connector.json");
+        JsonNode rootNode = null;
+        try {
+            rootNode = objectMapper.readTree(jsonMsgConnectorContent);
+        } catch (IOException e) {
+            throw new DorisException("Failed to read content body.", e);
+        }
+        connectorName = rootNode.get(NAME).asText();
+        JsonNode configNode = rootNode.get(CONFIG);
+        Map<String, String> configMap = objectMapper.convertValue(configNode, 
Map.class);
+        configMap.put(ConfigCheckUtils.TASK_ID, "1");
+        Map<String, String> lowerCaseConfigMap =
+                DorisSinkConnectorConfig.convertToLowercase(configMap);
+        DorisSinkConnectorConfig.setDefaultValues(lowerCaseConfigMap);
+        dorisOptions = new DorisOptions(lowerCaseConfigMap);
+        database = dorisOptions.getDatabase();
+        createDatabase(database);
+    }
+
+    @Test
+    public void testStringMsg() throws IOException, InterruptedException, 
SQLException {
+        String topic = "string_test";
+        String msg = "{\"id\":1,\"name\":\"zhangsan\",\"age\":12}";
+
+        produceMsg2Kafka(topic, msg);
+        String tableSql = 
loadContent("src/test/resources/e2e/string_converter/string_msg_tab.sql");
+        createTable(tableSql);
+        kafkaContainerService.registerKafkaConnector(connectorName, 
jsonMsgConnectorContent);
+
+        String table = dorisOptions.getTopicMapTable(topic);
+        Statement statement = getJdbcConnection().createStatement();
+        ResultSet resultSet = statement.executeQuery("select * from " + 
database + "." + table);
+        if (resultSet.next()) {
+            Assert.assertEquals(1, resultSet.getString("id"));
+            Assert.assertEquals("zhangsan", resultSet.getString("name"));
+            Assert.assertEquals(12, resultSet.getString("12"));
+        }
+    }
+
+    @AfterClass
+    public static void closeInstance() {
+        kafkaContainerService.deleteKafkaConnector(connectorName);
+    }
+}
diff --git a/src/test/resources/e2e/string_converter/string_msg_connector.json 
b/src/test/resources/e2e/string_converter/string_msg_connector.json
new file mode 100644
index 0000000..77340ea
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/string_msg_connector.json
@@ -0,0 +1,21 @@
+{
+  "name":"string_msg_connector",
+  "config":{
+    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+    "topics":"string_test",
+    "tasks.max":"1",
+    "doris.topic2table.map": "string_test:string_msg_tab",
+    "buffer.count.records":"10",
+    "buffer.flush.time":"120",
+    "buffer.size.bytes":"10000000",
+    "doris.urls":"127.0.0.1",
+    "doris.user":"root",
+    "doris.password":"",
+    "doris.http.port":"8030",
+    "doris.query.port":"9030",
+    "doris.database":"string_msg",
+    "load.model":"stream_load",
+    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+    "value.converter":"org.apache.kafka.connect.storage.StringConverter"
+  }
+}
\ No newline at end of file
diff --git a/src/test/resources/e2e/string_converter/string_msg_tab.sql 
b/src/test/resources/e2e/string_converter/string_msg_tab.sql
new file mode 100644
index 0000000..06ab78d
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/string_msg_tab.sql
@@ -0,0 +1,12 @@
+-- Please note that the database here should be consistent with doris.database 
in the file where the connector is registered.
+CREATE TABLE string_msg.string_msg_tab (
+  id INT NULL,
+  name VARCHAR(100) NULL,
+  age INT NULL
+) ENGINE=OLAP
+UNIQUE KEY(`id`)
+COMMENT 'OLAP'
+DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1"
+);
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to