This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 528c901 [feature]Add kafka to doris container test case (#33) 528c901 is described below commit 528c901202067e0b851a626917906337d61d2270 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Mon Jul 1 16:22:47 2024 +0800 [feature]Add kafka to doris container test case (#33) --- .../workflows/kafka2doris-e2ecase.yaml | 33 ++- .licenserc.yaml | 1 + pom.xml | 24 +++ .../connector/e2e/doris/DorisContainerService.java | 29 +++ .../e2e/doris/DorisContainerServiceImpl.java | 188 ++++++++++++++++ .../connector/e2e/kafka/KafkaContainerService.java | 37 ++++ .../e2e/kafka/KafkaContainerServiceImpl.java | 239 +++++++++++++++++++++ .../e2e/sink/AbstractKafka2DorisSink.java | 125 +++++++++++ .../stringconverter/AbstractStringE2ESinkTest.java | 62 ++++++ .../e2e/sink/stringconverter/StringMsgE2ETest.java | 95 ++++++++ .../e2e/string_converter/string_msg_connector.json | 21 ++ .../e2e/string_converter/string_msg_tab.sql | 12 ++ 12 files changed, 857 insertions(+), 9 deletions(-) diff --git a/.licenserc.yaml b/.github/workflows/kafka2doris-e2ecase.yaml similarity index 61% copy from .licenserc.yaml copy to .github/workflows/kafka2doris-e2ecase.yaml index ca50638..f30f5c1 100644 --- a/.licenserc.yaml +++ b/.github/workflows/kafka2doris-e2ecase.yaml @@ -15,15 +15,30 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +# +--- +name: E2E Test CI +on: + pull_request: + push: -header: - license: - spdx-id: Apache-2.0 - copyright-owner: Apache Software Foundation +jobs: + build-extension: + name: "e2e test case" + runs-on: ubuntu-latest + defaults: + run: + shell: bash + steps: + - name: Checkout + uses: actions/checkout@master - paths-ignore: - - 'LICENSE' - - '.gitignore' - - 'src/test/resources/decode/avro/**' + - name: Setup java + uses: actions/setup-java@v2 + with: + distribution: adopt + java-version: '8' - comment: on-failure \ No newline at end of file + - name: Doris E2E Test + run: | + mvn test -Dtest='org.apache.doris.kafka.connector.e2e.**' \ No newline at end of file diff --git a/.licenserc.yaml b/.licenserc.yaml index ca50638..07a811b 100644 --- a/.licenserc.yaml +++ b/.licenserc.yaml @@ -25,5 +25,6 @@ header: - 'LICENSE' - '.gitignore' - 'src/test/resources/decode/avro/**' + - 'src/test/resources/e2e/**' comment: on-failure \ No newline at end of file diff --git a/pom.xml b/pom.xml index 29005cb..fd99b7d 100644 --- a/pom.xml +++ b/pom.xml @@ -91,6 +91,7 @@ <httpcomponents.version>4.5.13</httpcomponents.version> <commons-io.version>2.3</commons-io.version> <geometry.version>2.2.0</geometry.version> + <testcontainers.version>1.17.6</testcontainers.version> </properties> <repositories> @@ -294,6 +295,18 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <version>${testcontainers.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>kafka</artifactId> + <version>${testcontainers.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> @@ -422,6 +435,17 @@ </execution> </executions> </plugin> + <!-- When using mvn to build a project, ignore packages under specific paths--> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.22.2</version> + <configuration> + <excludes> + <exclude>**/org/apache/doris/kafka/connector/e2e/**</exclude> + </excludes> + </configuration> + </plugin> </plugins> </build> <profiles> diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerService.java b/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerService.java new file mode 100644 index 0000000..79ec94d --- /dev/null +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerService.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.e2e.doris; + +public interface DorisContainerService { + + void startContainer(); + + String getInstanceHost(); + + void close(); +} diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerServiceImpl.java b/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerServiceImpl.java new file mode 100644 index 0000000..c328044 --- /dev/null +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerServiceImpl.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.e2e.doris; + +import com.google.common.collect.Lists; +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URL; +import java.net.URLClassLoader; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.LockSupport; +import org.apache.doris.kafka.connector.exception.DorisException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.DockerLoggerFactory; + +public class DorisContainerServiceImpl implements DorisContainerService { + protected static final Logger LOG = LoggerFactory.getLogger(DorisContainerServiceImpl.class); + protected static final String DORIS_DOCKER_IMAGE = "apache/doris:doris-all-in-one-2.1.0"; + private static final String DRIVER_JAR = + "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar"; + protected static final String JDBC_URL = "jdbc:mysql://%s:9030"; + protected static final String USERNAME = "root"; + protected static final String PASSWORD = ""; + private final GenericContainer dorisContainer; + + public DorisContainerServiceImpl() { + dorisContainer = createDorisContainer(); + } + + public GenericContainer createDorisContainer() { + LOG.info("Will create doris containers."); + GenericContainer container = + new GenericContainer<>(DORIS_DOCKER_IMAGE) + .withNetwork(Network.newNetwork()) + .withNetworkAliases("DorisContainer") + .withPrivilegedMode(true) + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE))) + .withExposedPorts(8030, 9030, 8040, 9060); + + container.setPortBindings( + Lists.newArrayList( + String.format("%s:%s", "8030", "8030"), + String.format("%s:%s", "9030", "9030"), + String.format("%s:%s", "9060", "9060"), + String.format("%s:%s", "8040", "8040"))); + return container; + } + + public void startContainer() { + try { + LOG.info("Starting doris containers."); + // singleton doris container + dorisContainer.start(); + initializeJdbcConnection(); + printClusterStatus(); + } catch (Exception ex) { + LOG.error("Failed to start containers doris", ex); + throw new DorisException("Failed to start containers doris", ex); + } + LOG.info("Doris container started successfully."); + } + + @Override + public String getInstanceHost() { + return dorisContainer.getHost(); + } + + public void close() { + LOG.info("Doris container is about to be close."); + dorisContainer.close(); + LOG.info("Doris container closed successfully."); + } + + private void initializeJdbcConnection() throws Exception { + URLClassLoader urlClassLoader = + new URLClassLoader( + new URL[] {new URL(DRIVER_JAR)}, + DorisContainerServiceImpl.class.getClassLoader()); + LOG.info("Try to connect to Doris."); + Thread.currentThread().setContextClassLoader(urlClassLoader); + try (Connection connection = + DriverManager.getConnection( + String.format(JDBC_URL, dorisContainer.getHost()), + USERNAME, + PASSWORD); + Statement statement = connection.createStatement()) { + ResultSet resultSet; + do { + LOG.info("Waiting for the Backend to start successfully."); + resultSet = statement.executeQuery("show backends"); + } while (!isBeReady(resultSet, Duration.ofSeconds(1L))); + } + LOG.info("Connected to Doris successfully."); + } + + private boolean isBeReady(ResultSet rs, Duration duration) throws SQLException { + LockSupport.parkNanos(duration.toNanos()); + if (rs.next()) { + String isAlive = rs.getString("Alive").trim(); + String totalCap = rs.getString("TotalCapacity").trim(); + return "true".equalsIgnoreCase(isAlive) && !"0.000".equalsIgnoreCase(totalCap); + } + return false; + } + + private void printClusterStatus() throws Exception { + LOG.info("Current machine IP: {}", dorisContainer.getHost()); + echo("sh", "-c", "cat /proc/cpuinfo | grep 'cpu cores' | uniq"); + echo("sh", "-c", "free -h"); + try (Connection connection = + DriverManager.getConnection( + String.format(JDBC_URL, dorisContainer.getHost()), + USERNAME, + PASSWORD); + Statement statement = connection.createStatement()) { + ResultSet showFrontends = statement.executeQuery("show frontends"); + LOG.info("Frontends status: {}", convertList(showFrontends)); + ResultSet showBackends = statement.executeQuery("show backends"); + LOG.info("Backends status: {}", convertList(showBackends)); + } + } + + private void echo(String... cmd) { + try { + Process p = Runtime.getRuntime().exec(cmd); + InputStream is = p.getInputStream(); + BufferedReader reader = new BufferedReader(new InputStreamReader(is)); + String line; + while ((line = reader.readLine()) != null) { + System.out.println(line); + } + p.waitFor(); + is.close(); + reader.close(); + p.destroy(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private List<Map> convertList(ResultSet rs) throws SQLException { + List<Map> list = new ArrayList<>(); + ResultSetMetaData metaData = rs.getMetaData(); + int columnCount = metaData.getColumnCount(); + while (rs.next()) { + Map<String, Object> rowData = new HashMap<>(); + for (int i = 1; i <= columnCount; i++) { + rowData.put(metaData.getColumnName(i), rs.getObject(i)); + } + list.add(rowData); + } + return list; + } +} diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java new file mode 100644 index 0000000..45616c4 --- /dev/null +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.e2e.kafka; + +import java.io.IOException; + +public interface KafkaContainerService { + + void startContainer(); + + void startConnector(); + + String getInstanceHostAndPort(); + + void registerKafkaConnector(String name, String msg) throws IOException, InterruptedException; + + void deleteKafkaConnector(String name); + + void close(); +} diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java new file mode 100644 index 0000000..083cdb2 --- /dev/null +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.e2e.kafka; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.doris.kafka.connector.exception.DorisException; +import org.apache.http.StatusLine; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.kafka.connect.cli.ConnectDistributed; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.DockerImageName; + +public class KafkaContainerServiceImpl implements KafkaContainerService { + private static final Logger LOG = LoggerFactory.getLogger(KafkaContainerServiceImpl.class); + private static final String KAFKA_IMAGE = "confluentinc/cp-kafka:7.6.1"; + private static final String CONNECT_PROPERTIES_PATH = + Objects.requireNonNull( + KafkaContainerServiceImpl.class + .getClassLoader() + .getResource("connect-distributed.properties")) + .getPath(); + private static final String NEW_CONNECT_PROPERTIES = + "src/test/resources/new-connect-distributed.properties"; + private KafkaContainer kafkaContainer; + private final CloseableHttpClient httpClient = HttpClients.createDefault(); + private String kafkaServerHost; + private int kafkaServerPort; + private static final String CONNECT_PORT = "8083"; + private static final String BOOTSTRAP_SERVERS = "bootstrap.servers"; + private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + private static final int MAX_RETRIES = 5; + + @Override + public String getInstanceHostAndPort() { + return kafkaServerHost + ":" + kafkaServerPort; + } + + public void startConnector() { + LOG.info("Doris-kafka-connect will be starting."); + try { + String[] params = new String[1]; + params[0] = getConnectPropertiesPath(); + // Start ConnectDistributed and run it in a separate thread to prevent blocking. + executorService.submit(() -> ConnectDistributed.main(params)); + LOG.info("kafka-connect has been submitted to start."); + Thread.sleep(10000); + } catch (Exception e) { + LOG.error("Failed to start doris-kafka-connect.", e); + } + waitForKafkaConnect(); + } + + public void waitForKafkaConnect() { + String kafkaConnectUrl = "http://" + kafkaServerHost + ":" + CONNECT_PORT + "/connectors"; + int responseCode = -1; + int attempts = 0; + while (attempts < MAX_RETRIES) { + try { + URL url = new URL(kafkaConnectUrl); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("GET"); + connection.setConnectTimeout(5000); + connection.setReadTimeout(5000); + + responseCode = connection.getResponseCode(); + if (responseCode == 200) { + LOG.info("doris-kafka-connect is up and running on " + kafkaConnectUrl); + return; + } + LOG.info( + "Received response code " + + responseCode + + ". Waiting for doris-kafka-connect to be ready."); + } catch (IOException e) { + LOG.info("Failed to connect to " + kafkaConnectUrl + ". Retrying..."); + } + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Thread interrupted while waiting for Kafka Connect to start.", e); + } + attempts++; + } + LOG.error("doris-kafka-connect did not start within " + MAX_RETRIES + " attempts."); + } + + /** + * After the container containing the kafka server is started, the externally mapped port is not + * 9092. To keep the real kafka 9092 mapped port consistent with the connect registered port. + */ + private String getConnectPropertiesPath() { + Properties properties = new Properties(); + try (InputStream fis = Files.newInputStream(Paths.get(CONNECT_PROPERTIES_PATH))) { + properties.load(fis); + } catch (IOException e) { + throw new DorisException( + "Failed to read " + CONNECT_PROPERTIES_PATH + "properties file.", e); + } + String bootstrapServers = kafkaServerHost + ":" + kafkaServerPort; + properties.put(BOOTSTRAP_SERVERS, bootstrapServers); + LOG.info("The bootstrap.servers set to {}", bootstrapServers); + + try (OutputStream fos = Files.newOutputStream(Paths.get(NEW_CONNECT_PROPERTIES))) { + properties.store(fos, "Updated Kafka Connect Properties."); + } catch (IOException e) { + throw new DorisException("Failed to write properties file", e); + } + return NEW_CONNECT_PROPERTIES; + } + + @Override + public void startContainer() { + LOG.info("kafka server is about to be initialized."); + kafkaContainer = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE)); + + kafkaContainer.start(); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + throw new DorisException(e); + } + kafkaServerHost = kafkaContainer.getHost(); + kafkaServerPort = kafkaContainer.getMappedPort(9093); + LOG.info( + "kafka server started successfully. instance={}", + kafkaContainer.getBootstrapServers()); + LOG.info( + "kafka server started successfully. instanceHost={}, instancePort={}", + kafkaServerHost, + kafkaServerPort); + } + + @Override + public void close() { + LOG.info("Kafka server is about to be shut down."); + shutdownConnector(); + kafkaContainer.close(); + LOG.info("Kafka server shuts down successfully."); + } + + private void shutdownConnector() { + try { + LOG.info("Shutting down ExecutorService."); + executorService.shutdown(); + if (!executorService.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS)) { + executorService.shutdownNow(); + if (!executorService.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS)) { + LOG.error("ExecutorService did not terminate."); + } + } + } catch (InterruptedException ie) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + @Override + public void registerKafkaConnector(String name, String msg) + throws IOException, InterruptedException { + LOG.info("{} Kafka connector will be registering, bodyMsg={}", name, msg); + String connectUrl = "http://" + kafkaServerHost + ":" + CONNECT_PORT + "/connectors"; + HttpPost httpPost = new HttpPost(connectUrl); + StringEntity entity = new StringEntity(msg); + httpPost.setEntity(entity); + httpPost.setHeader("Content-type", "application/json"); + try (CloseableHttpResponse response = httpClient.execute(httpPost)) { + StatusLine statusLine = response.getStatusLine(); + if (statusLine.getStatusCode() != 201) { + LOG.warn( + "Failed to register {} kafka connect, msg={}", + name, + statusLine.getReasonPhrase()); + } + } catch (IOException e) { + LOG.warn("Failed to delete kafka connect, name={}", name); + } + LOG.info("{} Kafka connector registered successfully.", name); + + // The current thread sleeps for 10 seconds so that connect can consume messages to doris in + // time. + Thread.sleep(10000); + } + + @Override + public void deleteKafkaConnector(String name) { + LOG.info("{} Kafka connector will be deleting.", name); + String connectUrl = "http://" + kafkaServerHost + ":" + CONNECT_PORT + "/connectors/"; + String deleteUrl = connectUrl + name; + HttpDelete httpDelete = new HttpDelete(deleteUrl); + try (CloseableHttpResponse response = httpClient.execute(httpDelete)) { + StatusLine statusLine = response.getStatusLine(); + if (statusLine.getStatusCode() != 204) { + LOG.warn( + "Failed to delete {} kafka connect, msg={}", + name, + statusLine.getReasonPhrase()); + } + } catch (IOException e) { + LOG.warn("Failed to delete kafka connect, name={}", name); + } + LOG.info("{} Kafka connector deleted successfully.", name); + } +} diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java new file mode 100644 index 0000000..c4c7fe4 --- /dev/null +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.e2e.sink; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.doris.kafka.connector.e2e.doris.DorisContainerService; +import org.apache.doris.kafka.connector.e2e.doris.DorisContainerServiceImpl; +import org.apache.doris.kafka.connector.e2e.kafka.KafkaContainerService; +import org.apache.doris.kafka.connector.e2e.kafka.KafkaContainerServiceImpl; +import org.apache.doris.kafka.connector.exception.DorisException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractKafka2DorisSink { + private static final Logger LOG = LoggerFactory.getLogger(AbstractKafka2DorisSink.class); + protected static final String NAME = "name"; + protected static final String CONFIG = "config"; + private static final String JDBC_URL = "jdbc:mysql://%s:9030"; + private static final String USERNAME = "root"; + private static final String PASSWORD = ""; + private static String dorisInstanceHost; + protected static String kafkaInstanceHostAndPort; + protected static KafkaContainerService kafkaContainerService; + protected static ObjectMapper objectMapper = new ObjectMapper(); + private static DorisContainerService dorisContainerService; + + @BeforeClass + public static void initServer() { + initDorisBase(); + initKafka(); + } + + private static void initKafka() { + if (Objects.nonNull(kafkaContainerService)) { + return; + } + kafkaContainerService = new KafkaContainerServiceImpl(); + kafkaContainerService.startContainer(); + kafkaContainerService.startConnector(); + kafkaInstanceHostAndPort = kafkaContainerService.getInstanceHostAndPort(); + } + + protected static Connection getJdbcConnection() throws SQLException { + return DriverManager.getConnection( + String.format(JDBC_URL, dorisInstanceHost), USERNAME, PASSWORD); + } + + protected static String loadContent(String path) { + try (InputStream stream = Files.newInputStream(Paths.get(path))) { + return new BufferedReader(new InputStreamReader(Objects.requireNonNull(stream))) + .lines() + .collect(Collectors.joining("\n")); + } catch (IOException e) { + throw new DorisException("Failed to read " + path + " file.", e); + } + } + + protected static void createDatabase(String databaseName) { + LOG.info("Will to be create database, sql={}", databaseName); + try { + Statement statement = getJdbcConnection().createStatement(); + statement.execute("create database if not exists " + databaseName); + } catch (SQLException e) { + throw new DorisException("Failed to create doris table.", e); + } + LOG.info("Create database successfully. databaseName={}", databaseName); + } + + protected void createTable(String sql) { + LOG.info("Will to be create doris table, sql={}", sql); + try { + Statement statement = getJdbcConnection().createStatement(); + statement.execute(sql); + } catch (SQLException e) { + throw new DorisException("Failed to create doris table.", e); + } + LOG.info("Create doris table successfully. sql={}", sql); + } + + private static void initDorisBase() { + if (Objects.nonNull(dorisContainerService)) { + return; + } + dorisContainerService = new DorisContainerServiceImpl(); + dorisContainerService.startContainer(); + dorisInstanceHost = dorisContainerService.getInstanceHost(); + } + + @AfterClass + public static void close() { + kafkaContainerService.close(); + dorisContainerService.close(); + } +} diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/AbstractStringE2ESinkTest.java b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/AbstractStringE2ESinkTest.java new file mode 100644 index 0000000..acf9ac0 --- /dev/null +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/AbstractStringE2ESinkTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.e2e.sink.stringconverter; + +import java.util.Objects; +import java.util.Properties; +import org.apache.doris.kafka.connector.e2e.sink.AbstractKafka2DorisSink; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Produce messages using the {@link org.apache.kafka.common.serialization.StringSerializer} + * serializer, Use the {@link org.apache.kafka.connect.storage.StringConverter} converter to consume + * messages. + */ +public abstract class AbstractStringE2ESinkTest extends AbstractKafka2DorisSink { + private static final Logger LOG = LoggerFactory.getLogger(AbstractStringE2ESinkTest.class); + private static KafkaProducer<String, String> producer; + + public static void initProducer() { + if (Objects.nonNull(producer)) { + return; + } + // Producer properties + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaInstanceHostAndPort); + producerProperties.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProperties.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producer = new KafkaProducer<>(producerProperties); + LOG.info("kafka producer started successfully."); + } + + protected void produceMsg2Kafka(String topic, String value) { + LOG.info("Kafka producer will produce msg. topic={}, msg={}", topic, value); + ProducerRecord<String, String> record = new ProducerRecord<>(topic, value); + producer.send(record); + LOG.info("Kafka producer produced msg successfully. topic={}, msg={}", topic, value); + } +} diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java new file mode 100644 index 0000000..9ab8891 --- /dev/null +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.e2e.sink.stringconverter; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Map; +import org.apache.doris.kafka.connector.cfg.DorisOptions; +import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig; +import org.apache.doris.kafka.connector.exception.DorisException; +import org.apache.doris.kafka.connector.utils.ConfigCheckUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class StringMsgE2ETest extends AbstractStringE2ESinkTest { + private static String connectorName; + private static String jsonMsgConnectorContent; + private static DorisOptions dorisOptions; + private static String database; + + @BeforeClass + public static void setUp() { + initServer(); + initProducer(); + initialize(); + } + + public static void initialize() { + jsonMsgConnectorContent = + loadContent("src/test/resources/e2e/string_converter/string_msg_connector.json"); + JsonNode rootNode = null; + try { + rootNode = objectMapper.readTree(jsonMsgConnectorContent); + } catch (IOException e) { + throw new DorisException("Failed to read content body.", e); + } + connectorName = rootNode.get(NAME).asText(); + JsonNode configNode = rootNode.get(CONFIG); + Map<String, String> configMap = objectMapper.convertValue(configNode, Map.class); + configMap.put(ConfigCheckUtils.TASK_ID, "1"); + Map<String, String> lowerCaseConfigMap = + DorisSinkConnectorConfig.convertToLowercase(configMap); + DorisSinkConnectorConfig.setDefaultValues(lowerCaseConfigMap); + dorisOptions = new DorisOptions(lowerCaseConfigMap); + database = dorisOptions.getDatabase(); + createDatabase(database); + } + + @Test + public void testStringMsg() throws IOException, InterruptedException, SQLException { + String topic = "string_test"; + String msg = "{\"id\":1,\"name\":\"zhangsan\",\"age\":12}"; + + produceMsg2Kafka(topic, msg); + String tableSql = loadContent("src/test/resources/e2e/string_converter/string_msg_tab.sql"); + createTable(tableSql); + kafkaContainerService.registerKafkaConnector(connectorName, jsonMsgConnectorContent); + + String table = dorisOptions.getTopicMapTable(topic); + Statement statement = getJdbcConnection().createStatement(); + ResultSet resultSet = statement.executeQuery("select * from " + database + "." + table); + if (resultSet.next()) { + Assert.assertEquals(1, resultSet.getString("id")); + Assert.assertEquals("zhangsan", resultSet.getString("name")); + Assert.assertEquals(12, resultSet.getString("12")); + } + } + + @AfterClass + public static void closeInstance() { + kafkaContainerService.deleteKafkaConnector(connectorName); + } +} diff --git a/src/test/resources/e2e/string_converter/string_msg_connector.json b/src/test/resources/e2e/string_converter/string_msg_connector.json new file mode 100644 index 0000000..77340ea --- /dev/null +++ b/src/test/resources/e2e/string_converter/string_msg_connector.json @@ -0,0 +1,21 @@ +{ + "name":"string_msg_connector", + "config":{ + "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector", + "topics":"string_test", + "tasks.max":"1", + "doris.topic2table.map": "string_test:string_msg_tab", + "buffer.count.records":"10", + "buffer.flush.time":"120", + "buffer.size.bytes":"10000000", + "doris.urls":"127.0.0.1", + "doris.user":"root", + "doris.password":"", + "doris.http.port":"8030", + "doris.query.port":"9030", + "doris.database":"string_msg", + "load.model":"stream_load", + "key.converter":"org.apache.kafka.connect.storage.StringConverter", + "value.converter":"org.apache.kafka.connect.storage.StringConverter" + } +} \ No newline at end of file diff --git a/src/test/resources/e2e/string_converter/string_msg_tab.sql b/src/test/resources/e2e/string_converter/string_msg_tab.sql new file mode 100644 index 0000000..06ab78d --- /dev/null +++ b/src/test/resources/e2e/string_converter/string_msg_tab.sql @@ -0,0 +1,12 @@ +-- Please note that the database here should be consistent with doris.database in the file where the connector is registered. +CREATE TABLE string_msg.string_msg_tab ( + id INT NULL, + name VARCHAR(100) NULL, + age INT NULL +) ENGINE=OLAP +UNIQUE KEY(`id`) +COMMENT 'OLAP' +DISTRIBUTED BY HASH(`id`) BUCKETS AUTO +PROPERTIES ( +"replication_allocation" = "tag.location.default: 1" +); \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org