This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 8d81331 [Improve] add retry strategy (#67)
8d81331 is described below
commit 8d81331fc5bfe752c41f9ff0dbb64bc36c5a91f5
Author: wudi <[email protected]>
AuthorDate: Fri Apr 11 16:11:58 2025 +0800
[Improve] add retry strategy (#67)
---
.../doris/kafka/connector/DorisSinkTask.java | 22 ++-
.../doris/kafka/connector/cfg/DorisOptions.java | 21 +++
.../connector/cfg/DorisSinkConnectorConfig.java | 23 +++-
.../doris/kafka/connector/model/RespContent.java | 4 +
.../kafka/connector/utils/ConfigCheckUtils.java | 16 +++
.../doris/kafka/connector/writer/DorisWriter.java | 3 +-
.../connector/writer/load/DorisStreamLoad.java | 3 +
.../cfg/TestDorisSinkConnectorConfig.java | 14 ++
.../e2e/doris/DorisContainerServiceImpl.java | 14 +-
.../e2e/sink/AbstractKafka2DorisSink.java | 55 ++++++++
.../stringconverter/DorisSinkFailoverSinkTest.java | 149 +++++++++++++++++++++
src/test/resources/docker/doris/be.conf | 99 ++++++++++++++
src/test/resources/docker/doris/fe.conf | 74 ++++++++++
.../string_msg_failover_connector.json | 25 ++++
.../string_converter/string_msg_tab_failover.sql | 12 ++
15 files changed, 528 insertions(+), 6 deletions(-)
diff --git a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
index f793d26..e83f033 100644
--- a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
+++ b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java
@@ -22,6 +22,7 @@ package org.apache.doris.kafka.connector;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
import org.apache.doris.kafka.connector.service.DorisSinkService;
import org.apache.doris.kafka.connector.service.DorisSinkServiceFactory;
import org.apache.doris.kafka.connector.utils.Version;
@@ -37,6 +38,8 @@ import org.slf4j.LoggerFactory;
public class DorisSinkTask extends SinkTask {
private static final Logger LOG =
LoggerFactory.getLogger(DorisSinkTask.class);
private DorisSinkService sink = null;
+ private DorisOptions options;
+ private int remainingRetries;
/** default constructor, invoked by kafka connect framework */
public DorisSinkTask() {}
@@ -49,7 +52,9 @@ public class DorisSinkTask extends SinkTask {
*/
@Override
public void start(final Map<String, String> parsedConfig) {
- LOG.info("kafka doris sink task start");
+ LOG.info("kafka doris sink task start with {}", parsedConfig);
+ this.options = new DorisOptions(parsedConfig);
+ this.remainingRetries = options.getMaxRetries();
this.sink = DorisSinkServiceFactory.getDorisSinkService(parsedConfig);
}
@@ -94,7 +99,20 @@ public class DorisSinkTask extends SinkTask {
@Override
public void put(final Collection<SinkRecord> records) {
LOG.info("Read {} records from Kafka", records.size());
- sink.insert(records);
+ try {
+ sink.insert(records);
+ } catch (Exception ex) {
+ LOG.error("Error inserting records to Doris", ex);
+ if (remainingRetries > 0) {
+ LOG.info(
+ "Retrying to insert records to Doris, remaining
retries: {}",
+ remainingRetries);
+ remainingRetries--;
+ context.timeout(options.getRetryIntervalMs());
+ throw new RetriableException(ex);
+ }
+ throw ex;
+ }
}
/**
diff --git
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
index 9afd9cb..c1a2cbd 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
@@ -66,6 +66,8 @@ public class DorisOptions {
private final DeliveryGuarantee deliveryGuarantee;
private final ConverterMode converterMode;
private final SchemaEvolutionMode schemaEvolutionMode;
+ private final int maxRetries;
+ private final int retryIntervalMs;
public DorisOptions(Map<String, String> config) {
this.name = config.get(DorisSinkConnectorConfig.NAME);
@@ -127,6 +129,17 @@ public class DorisOptions {
}
this.streamLoadProp = getStreamLoadPropFromConfig(config);
this.enableGroupCommit =
ConfigCheckUtils.validateGroupCommitMode(this);
+ this.maxRetries =
+ Integer.parseInt(
+ config.getOrDefault(
+ DorisSinkConnectorConfig.MAX_RETRIES,
+
String.valueOf(DorisSinkConnectorConfig.MAX_RETRIES_DEFAULT)));
+ this.retryIntervalMs =
+ Integer.parseInt(
+ config.getOrDefault(
+ DorisSinkConnectorConfig.RETRY_INTERVAL_MS,
+ String.valueOf(
+
DorisSinkConnectorConfig.RETRY_INTERVAL_MS_DEFAULT)));
}
private Properties getStreamLoadPropFromConfig(Map<String, String> config)
{
@@ -320,4 +333,12 @@ public class DorisOptions {
public boolean isEnableDelete() {
return enableDelete;
}
+
+ public int getMaxRetries() {
+ return maxRetries;
+ }
+
+ public int getRetryIntervalMs() {
+ return retryIntervalMs;
+ }
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
index 0dd7315..53442ca 100644
---
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
+++
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
@@ -89,6 +89,12 @@ public class DorisSinkConnectorConfig {
public static final String DEBEZIUM_SCHEMA_EVOLUTION_DEFAULT =
SchemaEvolutionMode.NONE.getName();
+ public static final String MAX_RETRIES = "max.retries";
+ public static final int MAX_RETRIES_DEFAULT = 10;
+
+ public static final String RETRY_INTERVAL_MS = "retry.interval.ms";
+ public static final int RETRY_INTERVAL_MS_DEFAULT = 6000;
+
// metrics
public static final String JMX_OPT = "jmx";
public static final boolean JMX_OPT_DEFAULT = true;
@@ -116,6 +122,9 @@ public class DorisSinkConnectorConfig {
setFieldToDefaultValues(
config, DEBEZIUM_SCHEMA_EVOLUTION,
DEBEZIUM_SCHEMA_EVOLUTION_DEFAULT);
setFieldToDefaultValues(config, JMX_OPT,
String.valueOf(JMX_OPT_DEFAULT));
+ setFieldToDefaultValues(config, MAX_RETRIES,
String.valueOf(MAX_RETRIES_DEFAULT));
+ setFieldToDefaultValues(
+ config, RETRY_INTERVAL_MS,
String.valueOf(RETRY_INTERVAL_MS_DEFAULT));
}
public static Map<String, String> convertToLowercase(Map<String, String>
config) {
@@ -270,7 +279,19 @@ public class DorisSinkConnectorConfig {
Type.STRING,
LOAD_MODEL_DEFAULT,
Importance.HIGH,
- "load model is stream_load.");
+ "load model is stream_load.")
+ .define(
+ MAX_RETRIES,
+ Type.INT,
+ MAX_RETRIES_DEFAULT,
+ Importance.MEDIUM,
+ "The maximum number of times to retry on errors before
failing the task.")
+ .define(
+ RETRY_INTERVAL_MS,
+ Type.INT,
+ RETRY_INTERVAL_MS_DEFAULT,
+ Importance.MEDIUM,
+ "The time in milliseconds to wait following an error
before a retry attempt is made.");
}
public static class TopicToTableValidator implements ConfigDef.Validator {
diff --git
a/src/main/java/org/apache/doris/kafka/connector/model/RespContent.java
b/src/main/java/org/apache/doris/kafka/connector/model/RespContent.java
index 5d29337..b66c416 100644
--- a/src/main/java/org/apache/doris/kafka/connector/model/RespContent.java
+++ b/src/main/java/org/apache/doris/kafka/connector/model/RespContent.java
@@ -97,6 +97,10 @@ public class RespContent {
return message;
}
+ public String getLabel() {
+ return label;
+ }
+
public String getExistingJobStatus() {
return existingJobStatus;
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
index c6611bc..d7a299f 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
@@ -184,6 +184,22 @@ public class ConfigCheckUtils {
configIsValid = false;
}
+ String maxRetries = config.get(DorisSinkConnectorConfig.MAX_RETRIES);
+ if (!isNumeric(maxRetries) || isIllegalRange(maxRetries, 0)) {
+ LOG.error(
+ "{} cannot be empty or not a number or less than 0.",
+ DorisSinkConnectorConfig.MAX_RETRIES);
+ configIsValid = false;
+ }
+
+ String retryIntervalMs =
config.get(DorisSinkConnectorConfig.RETRY_INTERVAL_MS);
+ if (!isNumeric(retryIntervalMs) || isIllegalRange(retryIntervalMs, 0))
{
+ LOG.error(
+ "{} cannot be empty or not a number or less than 0.",
+ DorisSinkConnectorConfig.RETRY_INTERVAL_MS);
+ configIsValid = false;
+ }
+
if (!configIsValid) {
throw new DorisException(
"input kafka connector configuration is null, missing
required values, or wrong input value");
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
index 9481a2f..348f784 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
@@ -120,7 +120,7 @@ public abstract class DorisWriter {
&& record.kafkaOffset() > processedOffset.get()) {
SinkRecord dorisRecord = record;
RecordBuffer tmpBuff = null;
- processedOffset.set(dorisRecord.kafkaOffset());
+
putBuffer(dorisRecord);
if (buffer.getBufferSizeBytes() >= dorisOptions.getFileSize()
|| (dorisOptions.getRecordNum() != 0
@@ -132,6 +132,7 @@ public abstract class DorisWriter {
if (tmpBuff != null) {
flush(tmpBuff);
}
+ processedOffset.set(dorisRecord.kafkaOffset());
} else {
LOG.warn(
"The record offset is smaller than processedOffset.
recordOffset={}, offsetPersistedInDoris={}, processedOffset={}",
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
b/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
index 9ab4b8d..f3a3a62 100644
---
a/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
+++
b/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
@@ -102,6 +102,9 @@ public class DorisStreamLoad extends DataLoad {
LOG.info("load Result {}", loadResult);
KafkaRespContent respContent =
OBJECT_MAPPER.readValue(loadResult,
KafkaRespContent.class);
+ if (respContent == null || respContent.getMessage() == null) {
+ throw new StreamLoadException("response error : " +
loadResult);
+ }
if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
String errMsg =
String.format(
diff --git
a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
index 9e46cc1..5caa668 100644
---
a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
+++
b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
@@ -260,6 +260,20 @@ public class TestDorisSinkConnectorConfig {
ConfigCheckUtils.validateConfig(config);
}
+ @Test(expected = DorisException.class)
+ public void testMaxRetryException() {
+ Map<String, String> config = getConfig();
+ config.put(DorisSinkConnectorConfig.MAX_RETRIES, "abc");
+ ConfigCheckUtils.validateConfig(config);
+ }
+
+ @Test(expected = DorisException.class)
+ public void testRetryIntervalMsException() {
+ Map<String, String> config = getConfig();
+ config.put(DorisSinkConnectorConfig.RETRY_INTERVAL_MS, "abc");
+ ConfigCheckUtils.validateConfig(config);
+ }
+
@Test
public void testSchemaEvolutionMode() {
Map<String, String> config = getConfig();
diff --git
a/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerServiceImpl.java
b/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerServiceImpl.java
index c328044..e626538 100644
---
a/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerServiceImpl.java
+++
b/src/test/java/org/apache/doris/kafka/connector/e2e/doris/DorisContainerServiceImpl.java
@@ -44,6 +44,7 @@ import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
public class DorisContainerServiceImpl implements DorisContainerService {
protected static final Logger LOG =
LoggerFactory.getLogger(DorisContainerServiceImpl.class);
@@ -69,14 +70,23 @@ public class DorisContainerServiceImpl implements
DorisContainerService {
.withLogConsumer(
new Slf4jLogConsumer(
DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE)))
- .withExposedPorts(8030, 9030, 8040, 9060);
+ // use customer conf
+ .withCopyFileToContainer(
+
MountableFile.forClasspathResource("docker/doris/be.conf"),
+ "/opt/apache-doris/be/conf/be.conf")
+ .withCopyFileToContainer(
+
MountableFile.forClasspathResource("docker/doris/fe.conf"),
+ "/opt/apache-doris/fe/conf/fe.conf")
+ .withExposedPorts(8030, 9030, 8040, 9060, 9611, 9610);
container.setPortBindings(
Lists.newArrayList(
String.format("%s:%s", "8030", "8030"),
String.format("%s:%s", "9030", "9030"),
String.format("%s:%s", "9060", "9060"),
- String.format("%s:%s", "8040", "8040")));
+ String.format("%s:%s", "8040", "8040"),
+ String.format("%s:%s", "9611", "9611"),
+ String.format("%s:%s", "9610", "9610")));
return container;
}
diff --git
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
index 78fd626..ad0b84a 100644
---
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
+++
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
@@ -24,6 +24,7 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
@@ -36,12 +37,19 @@ import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
+import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.kafka.connector.e2e.doris.DorisContainerService;
import org.apache.doris.kafka.connector.e2e.doris.DorisContainerServiceImpl;
import org.apache.doris.kafka.connector.e2e.kafka.KafkaContainerService;
import org.apache.doris.kafka.connector.e2e.kafka.KafkaContainerServiceImpl;
import org.apache.doris.kafka.connector.exception.DorisException;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -195,4 +203,51 @@ public abstract class AbstractKafka2DorisSink {
LOG.info("actual result: {}", Arrays.toString(actual.toArray()));
Assert.assertArrayEquals(expected.toArray(), actual.toArray());
}
+
+ protected void faultInjectionOpen() throws IOException {
+ String pointName = "FlushToken.submit_flush_error";
+ String apiUrl =
+ String.format(
+ "http://%s:%s/api/debug_point/add/%s",
+ dorisContainerService.getInstanceHost(), 8040,
pointName);
+ HttpPost httpPost = new HttpPost(apiUrl);
+ httpPost.addHeader(HttpHeaders.AUTHORIZATION, auth(USERNAME,
PASSWORD));
+ try (CloseableHttpClient httpClient = HttpClients.custom().build()) {
+ try (CloseableHttpResponse response =
httpClient.execute(httpPost)) {
+ int statusCode = response.getStatusLine().getStatusCode();
+ String reason = response.getStatusLine().toString();
+ if (statusCode == 200 && response.getEntity() != null) {
+ LOG.info("Debug point response {}",
EntityUtils.toString(response.getEntity()));
+ } else {
+ LOG.info("Debug point failed, statusCode: {}, reason: {}",
statusCode, reason);
+ }
+ }
+ }
+ }
+
+ protected void faultInjectionClear() throws IOException {
+ String apiUrl =
+ String.format(
+ "http://%s:%s/api/debug_point/clear",
+ dorisContainerService.getInstanceHost(), 8040);
+ HttpPost httpPost = new HttpPost(apiUrl);
+ httpPost.addHeader(HttpHeaders.AUTHORIZATION, auth(USERNAME,
PASSWORD));
+ try (CloseableHttpClient httpClient = HttpClients.custom().build()) {
+ try (CloseableHttpResponse response =
httpClient.execute(httpPost)) {
+ int statusCode = response.getStatusLine().getStatusCode();
+ String reason = response.getStatusLine().toString();
+ if (statusCode == 200 && response.getEntity() != null) {
+ LOG.info("Debug point response {}",
EntityUtils.toString(response.getEntity()));
+ } else {
+ LOG.info("Debug point failed, statusCode: {}, reason: {}",
statusCode, reason);
+ }
+ }
+ }
+ }
+
+ protected String auth(String user, String password) {
+ final String authInfo = user + ":" + password;
+ byte[] encoded =
Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8));
+ return "Basic " + new String(encoded);
+ }
}
diff --git
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/DorisSinkFailoverSinkTest.java
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/DorisSinkFailoverSinkTest.java
new file mode 100644
index 0000000..c81055f
--- /dev/null
+++
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/DorisSinkFailoverSinkTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.e2e.sink.stringconverter;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.StringJoiner;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
+import org.apache.doris.kafka.connector.exception.DorisException;
+import org.apache.doris.kafka.connector.utils.ConfigCheckUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** DorisSinkFailoverSinkTest is a test class for Doris Sink Connector. */
+public class DorisSinkFailoverSinkTest extends AbstractStringE2ESinkTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(DorisSinkFailoverSinkTest.class);
+ private static String connectorName;
+ private static String jsonMsgConnectorContent;
+ private static DorisOptions dorisOptions;
+ private static String database;
+
+ @BeforeClass
+ public static void setUp() {
+ initServer();
+ initProducer();
+ }
+
+ public static void initialize(String connectorPath) {
+ jsonMsgConnectorContent = loadContent(connectorPath);
+ JsonNode rootNode = null;
+ try {
+ rootNode = objectMapper.readTree(jsonMsgConnectorContent);
+ } catch (IOException e) {
+ throw new DorisException("Failed to read content body.", e);
+ }
+ connectorName = rootNode.get(NAME).asText();
+ JsonNode configNode = rootNode.get(CONFIG);
+ Map<String, String> configMap = objectMapper.convertValue(configNode,
Map.class);
+ configMap.put(ConfigCheckUtils.TASK_ID, "1");
+ Map<String, String> lowerCaseConfigMap =
+ DorisSinkConnectorConfig.convertToLowercase(configMap);
+ DorisSinkConnectorConfig.setDefaultValues(lowerCaseConfigMap);
+ dorisOptions = new DorisOptions(lowerCaseConfigMap);
+ database = dorisOptions.getDatabase();
+ createDatabase(database);
+ setTimeZone();
+ }
+
+ private static void setTimeZone() {
+ executeSql(getJdbcConnection(), "set global time_zone =
'Asia/Shanghai'");
+ }
+
+ /** mock streamload failure */
+ @Test
+ public void testStreamLoadFailoverSink() throws Exception {
+ LOG.info("start to test testStreamLoadFailoverSink.");
+
initialize("src/test/resources/e2e/string_converter/string_msg_failover_connector.json");
+ Thread.sleep(5000);
+ String topic = "string_test_failover";
+ String msg1 = "{\"id\":1,\"name\":\"zhangsan\",\"age\":12}";
+ produceMsg2Kafka(topic, msg1);
+
+ String tableSql =
+
loadContent("src/test/resources/e2e/string_converter/string_msg_tab_failover.sql");
+ createTable(tableSql);
+
+ kafkaContainerService.registerKafkaConnector(connectorName,
jsonMsgConnectorContent);
+
+ String table = dorisOptions.getTopicMapTable(topic);
+ String querySql =
+ String.format("select id,name,age from %s.%s order by id",
database, table);
+ LOG.info("start to query result from doris. sql={}", querySql);
+ while (true) {
+ List<String> result = executeSQLStatement(getJdbcConnection(),
LOG, querySql, 3);
+ // until load success one time
+ if (result.size() >= 1) {
+ faultInjectionOpen();
+ // mock new data
+ String msg2 = "{\"id\":2,\"name\":\"lisi\",\"age\":18}";
+ produceMsg2Kafka(topic, msg2);
+ Thread.sleep(15000);
+ faultInjectionClear();
+ break;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+
+ String msg3 = "{\"id\":3,\"name\":\"wangwu\",\"age\":38}";
+ produceMsg2Kafka(topic, msg3);
+ Thread.sleep(25000);
+
+ List<String> excepted = Arrays.asList("1,zhangsan,12", "2,lisi,18",
"3,wangwu,38");
+ checkResult(excepted, querySql, 3);
+ }
+
+ public static List<String> executeSQLStatement(
+ Connection connection, Logger logger, String sql, int columnSize) {
+ List<String> result = new ArrayList<>();
+ if (Objects.isNull(sql)) {
+ return result;
+ }
+ try (Statement statement = connection.createStatement()) {
+ logger.info("start to execute sql={}", sql);
+ ResultSet resultSet = statement.executeQuery(sql);
+
+ while (resultSet.next()) {
+ StringJoiner sb = new StringJoiner(",");
+ for (int i = 1; i <= columnSize; i++) {
+ Object value = resultSet.getObject(i);
+ sb.add(String.valueOf(value));
+ }
+ result.add(sb.toString());
+ }
+ return result;
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/src/test/resources/docker/doris/be.conf
b/src/test/resources/docker/doris/be.conf
new file mode 100644
index 0000000..94b76e0
--- /dev/null
+++ b/src/test/resources/docker/doris/be.conf
@@ -0,0 +1,99 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+CUR_DATE=`date +%Y%m%d-%H%M%S`
+
+PPROF_TMPDIR="$DORIS_HOME/log/"
+
+JAVA_OPTS="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log
-Xloggc:$DORIS_HOME/log/be.gc.log.$CUR_DATE
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true
-Dsun.java.command=DorisBE -XX:-CriticalJNINatives"
+
+# For jdk 9+, this JAVA_OPTS will be used as default JVM options
+JAVA_OPTS_FOR_JDK_9="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log
-Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true
-Dsun.java.command=DorisBE -XX:-CriticalJNINatives"
+
+# For jdk 17+, this JAVA_OPTS will be used as default JVM options
+JAVA_OPTS_FOR_JDK_17="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log
-Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true
-Dsun.java.command=DorisBE -XX:-CriticalJNINatives
--add-opens=java.base/java.net=ALL-UNNAMED"
+
+# since 1.2, the JAVA_HOME need to be set to run BE process.
+# JAVA_HOME=/path/to/jdk/
+
+#
https://github.com/apache/doris/blob/master/docs/zh-CN/community/developer-guide/debug-tool.md#jemalloc-heap-profile
+# https://jemalloc.net/jemalloc.3.html
+JEMALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:15000,dirty_decay_ms:15000,oversize_threshold:0,prof:false,lg_prof_interval:32,lg_prof_sample:19,prof_gdump:false,prof_accum:false,prof_leak:false,prof_final:false"
+JEMALLOC_PROF_PRFIX=""
+
+# INFO, WARNING, ERROR, FATAL
+sys_log_level = INFO
+
+# ports for admin, web, heartbeat service
+be_port = 9060
+webserver_port = 8040
+heartbeat_service_port = 9050
+brpc_port = 8060
+arrow_flight_sql_port = 9610
+enable_debug_points = true
+
+# HTTPS configures
+enable_https = false
+# path of certificate in PEM format.
+ssl_certificate_path = "$DORIS_HOME/conf/cert.pem"
+# path of private key in PEM format.
+ssl_private_key_path = "$DORIS_HOME/conf/key.pem"
+
+
+# Choose one if there are more than one ip except loopback address.
+# Note that there should at most one ip match this list.
+# If no ip match this rule, will choose one randomly.
+# use CIDR format, e.g. 10.10.10.0/24 or IP format, e.g. 10.10.10.1
+# Default value is empty.
+# priority_networks = 10.10.10.0/24;192.168.0.0/16
+
+# data root path, separate by ';'
+# You can specify the storage type for each root path, HDD (cold data) or SSD
(hot data)
+# eg:
+# storage_root_path = /home/disk1/doris;/home/disk2/doris;/home/disk2/doris
+# storage_root_path =
/home/disk1/doris,medium:SSD;/home/disk2/doris,medium:SSD;/home/disk2/doris,medium:HDD
+# /home/disk2/doris,medium:HDD(default)
+#
+# you also can specify the properties by setting '<property>:<value>',
separate by ','
+# property 'medium' has a higher priority than the extension of path
+#
+# Default value is ${DORIS_HOME}/storage, you should create it by hand.
+# storage_root_path = ${DORIS_HOME}/storage
+
+# Default dirs to put jdbc drivers,default value is ${DORIS_HOME}/jdbc_drivers
+# jdbc_drivers_dir = ${DORIS_HOME}/jdbc_drivers
+
+# Advanced configurations
+# sys_log_dir = ${DORIS_HOME}/log
+# sys_log_roll_mode = SIZE-MB-1024
+# sys_log_roll_num = 10
+# sys_log_verbose_modules = *
+# log_buffer_level = -1
+# palo_cgroups
+
+# aws sdk log level
+# Off = 0,
+# Fatal = 1,
+# Error = 2,
+# Warn = 3,
+# Info = 4,
+# Debug = 5,
+# Trace = 6
+# Default to turn off aws sdk log, because aws sdk errors that need to be
cared will be output through Doris logs
+aws_log_level=0
+## If you are not running in aws cloud, you can disable EC2 metadata
+AWS_EC2_METADATA_DISABLED=true
\ No newline at end of file
diff --git a/src/test/resources/docker/doris/fe.conf
b/src/test/resources/docker/doris/fe.conf
new file mode 100644
index 0000000..a45fb53
--- /dev/null
+++ b/src/test/resources/docker/doris/fe.conf
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#####################################################################
+## The uppercase properties are read and exported by bin/start_fe.sh.
+## To see all Frontend configurations,
+## see fe/src/org/apache/doris/common/Config.java
+#####################################################################
+
+CUR_DATE=`date +%Y%m%d-%H%M%S`
+
+# Log dir
+LOG_DIR = ${DORIS_HOME}/log
+
+# For jdk 17, this JAVA_OPTS will be used as default JVM options
+JAVA_OPTS_FOR_JDK_17="-Dfile.encoding=UTF-8
-Djavax.security.auth.useSubjectCredsOnly=false -Xmx8192m -Xms8192m
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$LOG_DIR
-Xlog:gc*,classhisto*=trace:$LOG_DIR/fe.gc.log.$CUR_DATE:time,uptime:filecount=10,filesize=50M
--add-opens=java.base/java.nio=ALL-UNNAMED --add-opens
java.base/jdk.internal.ref=ALL-UNNAMED"
+
+# Set your own JAVA_HOME
+# JAVA_HOME=/path/to/jdk/
+
+##
+## the lowercase properties are read by main program.
+##
+
+# store metadata, must be created before start FE.
+# Default value is ${DORIS_HOME}/doris-meta
+# meta_dir = ${DORIS_HOME}/doris-meta
+
+# Default dirs to put jdbc drivers,default value is ${DORIS_HOME}/jdbc_drivers
+# jdbc_drivers_dir = ${DORIS_HOME}/jdbc_drivers
+
+http_port = 8030
+rpc_port = 9020
+query_port = 9030
+edit_log_port = 9010
+arrow_flight_sql_port = 9611
+enable_debug_points = true
+arrow_flight_token_cache_size = 50
+# Choose one if there are more than one ip except loopback address.
+# Note that there should at most one ip match this list.
+# If no ip match this rule, will choose one randomly.
+# use CIDR format, e.g. 10.10.10.0/24 or IP format, e.g. 10.10.10.1
+# Default value is empty.
+# priority_networks = 10.10.10.0/24;192.168.0.0/16
+
+# Advanced configurations
+# log_roll_size_mb = 1024
+# INFO, WARN, ERROR, FATAL
+sys_log_level = INFO
+# NORMAL, BRIEF, ASYNC
+sys_log_mode = ASYNC
+# sys_log_roll_num = 10
+# sys_log_verbose_modules = org.apache.doris
+# audit_log_dir = $LOG_DIR
+# audit_log_modules = slow_query, query
+# audit_log_roll_num = 10
+# meta_delay_toleration_second = 10
+# qe_max_connection = 1024
+# qe_query_timeout_second = 300
+# qe_slow_log_ms = 5000
\ No newline at end of file
diff --git
a/src/test/resources/e2e/string_converter/string_msg_failover_connector.json
b/src/test/resources/e2e/string_converter/string_msg_failover_connector.json
new file mode 100644
index 0000000..d8604b5
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/string_msg_failover_connector.json
@@ -0,0 +1,25 @@
+{
+ "name":"string_msg_failover_connector",
+ "config":{
+ "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+ "topics":"string_test_failover",
+ "tasks.max":"1",
+ "doris.topic2table.map": "string_test_failover:string_msg_tab_failover",
+ "buffer.count.records":"1",
+ "buffer.flush.time":"1200",
+ "buffer.size.bytes":"10000000",
+ "doris.urls":"127.0.0.1",
+ "doris.user":"root",
+ "doris.password":"",
+ "doris.http.port":"8030",
+ "doris.query.port":"9030",
+ "doris.database":"string_msg_failover",
+ "load.model":"stream_load",
+ "delivery.guarantee":"exactly_once",
+ "enable.2pc": "true",
+ "max.retries": "10",
+ "retry.interval.ms": "5000",
+ "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+ "value.converter":"org.apache.kafka.connect.storage.StringConverter"
+ }
+}
\ No newline at end of file
diff --git
a/src/test/resources/e2e/string_converter/string_msg_tab_failover.sql
b/src/test/resources/e2e/string_converter/string_msg_tab_failover.sql
new file mode 100644
index 0000000..155f217
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/string_msg_tab_failover.sql
@@ -0,0 +1,12 @@
+-- Please note that the database here should be consistent with doris.database
in the file where the connector is registered.
+CREATE TABLE string_msg_failover.string_msg_tab_failover (
+ id INT NULL,
+ name VARCHAR(100) NULL,
+ age INT NULL
+) ENGINE=OLAP
+DUPLICATE KEY(`id`)
+COMMENT 'OLAP'
+DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1"
+);
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]