This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 56b4668 [Improve] add behavior on null values (#69)
56b4668 is described below
commit 56b4668696b3c33d044ecd7cc90e9b79dab18190
Author: wangchuang <[email protected]>
AuthorDate: Tue Apr 29 10:50:13 2025 +0800
[Improve] add behavior on null values (#69)
---
.../doris/kafka/connector/cfg/DorisOptions.java | 9 +++
.../connector/cfg/DorisSinkConnectorConfig.java | 13 ++++-
.../connector/model/BehaviorOnNullValues.java | 40 +++++++++++++
.../connector/service/DorisDefaultSinkService.java | 39 ++++++++++---
.../kafka/connector/utils/ConfigCheckUtils.java | 10 ++++
.../cfg/TestDorisSinkConnectorConfig.java | 7 +++
.../e2e/sink/stringconverter/StringMsgE2ETest.java | 55 ++++++++++++++++++
.../connector/service/TestDorisSinkService.java | 67 +++++++++++++++++++++-
.../e2e/string_converter/null_values_default.json | 23 ++++++++
.../e2e/string_converter/null_values_fail.json | 24 ++++++++
.../e2e/string_converter/null_values_ignore.json | 24 ++++++++
.../e2e/string_converter/null_values_tab.sql | 12 ++++
12 files changed, 314 insertions(+), 9 deletions(-)
diff --git
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
index c1a2cbd..922eee6 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
@@ -28,6 +28,7 @@ import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.doris.kafka.connector.converter.ConverterMode;
import org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode;
+import org.apache.doris.kafka.connector.model.BehaviorOnNullValues;
import org.apache.doris.kafka.connector.utils.ConfigCheckUtils;
import org.apache.doris.kafka.connector.writer.DeliveryGuarantee;
import org.apache.doris.kafka.connector.writer.load.LoadModel;
@@ -68,6 +69,7 @@ public class DorisOptions {
private final SchemaEvolutionMode schemaEvolutionMode;
private final int maxRetries;
private final int retryIntervalMs;
+ private final BehaviorOnNullValues behaviorOnNullValues;
public DorisOptions(Map<String, String> config) {
this.name = config.get(DorisSinkConnectorConfig.NAME);
@@ -140,6 +142,9 @@ public class DorisOptions {
DorisSinkConnectorConfig.RETRY_INTERVAL_MS,
String.valueOf(
DorisSinkConnectorConfig.RETRY_INTERVAL_MS_DEFAULT)));
+ this.behaviorOnNullValues =
+ BehaviorOnNullValues.of(
+
config.get(DorisSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES));
}
private Properties getStreamLoadPropFromConfig(Map<String, String> config)
{
@@ -341,4 +346,8 @@ public class DorisOptions {
public int getRetryIntervalMs() {
return retryIntervalMs;
}
+
+ public BehaviorOnNullValues getBehaviorOnNullValues() {
+ return behaviorOnNullValues;
+ }
}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
index 53442ca..5b8807e 100644
---
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
+++
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
@@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.doris.kafka.connector.DorisSinkConnector;
import org.apache.doris.kafka.connector.converter.ConverterMode;
import org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode;
+import org.apache.doris.kafka.connector.model.BehaviorOnNullValues;
import org.apache.doris.kafka.connector.utils.ConfigCheckUtils;
import org.apache.doris.kafka.connector.writer.DeliveryGuarantee;
import org.apache.doris.kafka.connector.writer.load.LoadModel;
@@ -95,6 +96,9 @@ public class DorisSinkConnectorConfig {
public static final String RETRY_INTERVAL_MS = "retry.interval.ms";
public static final int RETRY_INTERVAL_MS_DEFAULT = 6000;
+ public static final String BEHAVIOR_ON_NULL_VALUES =
"behavior.on.null.values";
+ public static final String BEHAVIOR_ON_NULL_VALUES_DEFAULT =
BehaviorOnNullValues.IGNORE.name();
+
// metrics
public static final String JMX_OPT = "jmx";
public static final boolean JMX_OPT_DEFAULT = true;
@@ -125,6 +129,7 @@ public class DorisSinkConnectorConfig {
setFieldToDefaultValues(config, MAX_RETRIES,
String.valueOf(MAX_RETRIES_DEFAULT));
setFieldToDefaultValues(
config, RETRY_INTERVAL_MS,
String.valueOf(RETRY_INTERVAL_MS_DEFAULT));
+ setFieldToDefaultValues(config, BEHAVIOR_ON_NULL_VALUES,
BEHAVIOR_ON_NULL_VALUES_DEFAULT);
}
public static Map<String, String> convertToLowercase(Map<String, String>
config) {
@@ -291,7 +296,13 @@ public class DorisSinkConnectorConfig {
Type.INT,
RETRY_INTERVAL_MS_DEFAULT,
Importance.MEDIUM,
- "The time in milliseconds to wait following an error
before a retry attempt is made.");
+ "The time in milliseconds to wait following an error
before a retry attempt is made.")
+ .define(
+ BEHAVIOR_ON_NULL_VALUES,
+ Type.STRING,
+ BEHAVIOR_ON_NULL_VALUES_DEFAULT,
+ Importance.LOW,
+ "Used to handle records with a null value .");
}
public static class TopicToTableValidator implements ConfigDef.Validator {
diff --git
a/src/main/java/org/apache/doris/kafka/connector/model/BehaviorOnNullValues.java
b/src/main/java/org/apache/doris/kafka/connector/model/BehaviorOnNullValues.java
new file mode 100644
index 0000000..4398c6d
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/model/BehaviorOnNullValues.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.model;
+
+public enum BehaviorOnNullValues {
+ IGNORE("ignore"),
+
+ FAIL("fail");
+
+ private String name;
+
+ BehaviorOnNullValues(String name) {
+ this.name = name;
+ }
+
+ public static BehaviorOnNullValues of(String name) {
+ return BehaviorOnNullValues.valueOf(name.toUpperCase());
+ }
+
+ public static String[] instances() {
+ return new String[] {IGNORE.name, FAIL.name};
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
index 3a220f1..e091264 100644
---
a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
+++
b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
@@ -32,10 +32,12 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.kafka.connector.DorisSinkTask;
import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
import org.apache.doris.kafka.connector.connection.ConnectionProvider;
import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider;
import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
import org.apache.doris.kafka.connector.metrics.MetricsJmxReporter;
+import org.apache.doris.kafka.connector.model.BehaviorOnNullValues;
import org.apache.doris.kafka.connector.writer.CopyIntoWriter;
import org.apache.doris.kafka.connector.writer.DorisWriter;
import org.apache.doris.kafka.connector.writer.StreamLoadWriter;
@@ -44,6 +46,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
@@ -126,13 +129,8 @@ public class DorisDefaultSinkService implements
DorisSinkService {
public void insert(final Collection<SinkRecord> records) {
// note that records can be empty
for (SinkRecord record : records) {
- // skip null value records
- if (record.value() == null) {
- LOG.debug(
- "Null valued record from topic '{}', partition {} and
offset {} was skipped",
- record.topic(),
- record.kafkaPartition(),
- record.kafkaOffset());
+ // skip records
+ if (shouldSkipRecord(record)) {
continue;
}
// check topic mutating SMTs
@@ -212,6 +210,33 @@ public class DorisDefaultSinkService implements
DorisSinkService {
}
}
+ @VisibleForTesting
+ public boolean shouldSkipRecord(SinkRecord record) {
+ if (record.value() == null) {
+ switch (dorisOptions.getBehaviorOnNullValues()) {
+ case FAIL:
+ throw new DataException(
+ String.format(
+ "Null valued record from topic %s,
partition %s and offset %s was failed "
+ + "(the configuration property
'%s' is '%s').",
+ record.topic(),
+ record.kafkaPartition(),
+ record.kafkaOffset(),
+
DorisSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES,
+ BehaviorOnNullValues.FAIL));
+ case IGNORE:
+ default:
+ LOG.debug(
+ "Null valued record from topic '{}', partition {}
and offset {} was skipped",
+ record.topic(),
+ record.kafkaPartition(),
+ record.kafkaOffset());
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* Get the table name in doris for the given record.
*
diff --git
a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
index d7a299f..465b7dc 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java
@@ -32,6 +32,7 @@ import
org.apache.doris.kafka.connector.converter.ConverterMode;
import org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode;
import org.apache.doris.kafka.connector.exception.ArgumentsException;
import org.apache.doris.kafka.connector.exception.DorisException;
+import org.apache.doris.kafka.connector.model.BehaviorOnNullValues;
import org.apache.doris.kafka.connector.writer.DeliveryGuarantee;
import org.apache.doris.kafka.connector.writer.LoadConstants;
import org.apache.doris.kafka.connector.writer.load.GroupCommitMode;
@@ -200,6 +201,15 @@ public class ConfigCheckUtils {
configIsValid = false;
}
+ String behaviorOnNullValues =
config.get(DorisSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES);
+ if (!validateEnumInstances(behaviorOnNullValues,
BehaviorOnNullValues.instances())) {
+ LOG.error(
+ "The value of {} is an illegal parameter of {}.",
+ behaviorOnNullValues,
+ DorisSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES);
+ configIsValid = false;
+ }
+
if (!configIsValid) {
throw new DorisException(
"input kafka connector configuration is null, missing
required values, or wrong input value");
diff --git
a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
index 5caa668..e6633ec 100644
---
a/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
+++
b/src/test/java/org/apache/doris/kafka/connector/cfg/TestDorisSinkConnectorConfig.java
@@ -274,6 +274,13 @@ public class TestDorisSinkConnectorConfig {
ConfigCheckUtils.validateConfig(config);
}
+ @Test(expected = DorisException.class)
+ public void testBehaviorOnNullValuesException() {
+ Map<String, String> config = getConfig();
+ config.put(DorisSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES, "create");
+ ConfigCheckUtils.validateConfig(config);
+ }
+
@Test
public void testSchemaEvolutionMode() {
Map<String, String> config = getConfig();
diff --git
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
index 4430dad..9ec116f 100644
---
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
+++
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
@@ -331,6 +331,61 @@ public class StringMsgE2ETest extends
AbstractStringE2ESinkTest {
checkResult(expectedResult, query1, 3);
}
+ @Test
+ public void testBehaviorOnNullValues() throws Exception {
+ // default
+
initialize("src/test/resources/e2e/string_converter/null_values_default.json");
+ String tableSql =
+
loadContent("src/test/resources/e2e/string_converter/null_values_tab.sql");
+ createTable(tableSql);
+ Thread.sleep(2000);
+
+ String topic = "behavior_on_null_values_test";
+ String msg1 = "{\"id\":1,\"col1\":\"col1\",\"col2\":\"col2\"}";
+ produceMsg2Kafka(topic, null);
+ produceMsg2Kafka(topic, msg1);
+
+ kafkaContainerService.registerKafkaConnector(connectorName,
jsonMsgConnectorContent);
+
+ List<String> expected = Collections.singletonList("1,col1,col2");
+ Thread.sleep(10000);
+ String query1 =
+ String.format(
+ "select id,col1,col2 from %s.%s order by id",
database, "null_values_tab");
+ checkResult(expected, query1, 3);
+
+ kafkaContainerService.deleteKafkaConnector(connectorName);
+
+ // ignore
+
initialize("src/test/resources/e2e/string_converter/null_values_ignore.json");
+ produceMsg2Kafka(topic, null);
+ String msg2 = "{\"id\":2,\"col1\":\"col1\",\"col2\":\"col2\"}";
+ produceMsg2Kafka(topic, msg2);
+
+ kafkaContainerService.registerKafkaConnector(connectorName,
jsonMsgConnectorContent);
+
+ Thread.sleep(10000);
+ expected = Arrays.asList("1,col1,col2", "2,col1,col2");
+ checkResult(expected, query1, 3);
+
+ kafkaContainerService.deleteKafkaConnector(connectorName);
+
+ // fail
+
initialize("src/test/resources/e2e/string_converter/null_values_fail.json");
+ produceMsg2Kafka(topic, null);
+
+ kafkaContainerService.registerKafkaConnector(connectorName,
jsonMsgConnectorContent);
+
+ Thread.sleep(10000);
+ Assert.assertEquals("FAILED",
kafkaContainerService.getConnectorTaskStatus(connectorName));
+
+ String msg3 = "{\"id\":3,\"col1\":\"col1\",\"col2\":\"col2\"}";
+ produceMsg2Kafka(topic, msg3);
+ Thread.sleep(10000);
+ // msg3 is not consumed
+ checkResult(expected, query1, 3);
+ }
+
@AfterClass
public static void closeInstance() {
kafkaContainerService.deleteKafkaConnector(connectorName);
diff --git
a/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
b/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
index dc2e6ae..a99e144 100644
---
a/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
+++
b/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
@@ -36,6 +36,7 @@ import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
@@ -47,6 +48,7 @@ public class TestDorisSinkService {
private DorisDefaultSinkService dorisDefaultSinkService;
private JsonConverter jsonConverter = new JsonConverter();
+ private Properties props = new Properties();
@Before
public void init() throws IOException {
@@ -54,7 +56,6 @@ public class TestDorisSinkService {
this.getClass()
.getClassLoader()
.getResourceAsStream("doris-connector-sink.properties");
- Properties props = new Properties();
props.load(stream);
DorisSinkConnectorConfig.setDefaultValues((Map) props);
props.put("task_id", "1");
@@ -170,4 +171,68 @@ public class TestDorisSinkService {
ConnectException.class,
() -> dorisDefaultSinkService.checkTopicMutating(record2));
}
+
+ @Test
+ public void shouldSkipRecordTest() {
+ // default
+ SinkRecord record1 =
+ new SinkRecord(
+ "topic_test",
+ 0,
+ Schema.OPTIONAL_STRING_SCHEMA,
+ "key",
+ Schema.OPTIONAL_STRING_SCHEMA,
+ "{\"id\":1,\"name\":\"bob\",\"age\":12}",
+ 1);
+ Assert.assertFalse(dorisDefaultSinkService.shouldSkipRecord(record1));
+ SinkRecord record2 =
+ new SinkRecord(
+ "topic_test",
+ 0,
+ Schema.OPTIONAL_STRING_SCHEMA,
+ "key",
+ SchemaBuilder.map(Schema.STRING_SCHEMA,
Schema.STRING_SCHEMA)
+ .optional()
+ .build(),
+ null,
+ 1);
+ Assert.assertTrue(dorisDefaultSinkService.shouldSkipRecord(record2));
+
+ // ignore value
+ props.put("behavior.on.null.values", "ignore");
+ dorisDefaultSinkService =
+ new DorisDefaultSinkService((Map) props,
mock(SinkTaskContext.class));
+ SinkRecord record3 =
+ new SinkRecord(
+ "topic_test",
+ 0,
+ Schema.OPTIONAL_STRING_SCHEMA,
+ "key",
+ SchemaBuilder.map(Schema.STRING_SCHEMA,
Schema.STRING_SCHEMA)
+ .optional()
+ .build(),
+ null,
+ 1);
+ Assert.assertTrue(dorisDefaultSinkService.shouldSkipRecord(record3));
+
+ // fail value
+ props.put("behavior.on.null.values", "fail");
+ dorisDefaultSinkService =
+ new DorisDefaultSinkService((Map) props,
mock(SinkTaskContext.class));
+ SinkRecord record4 =
+ new SinkRecord(
+ "topic_test",
+ 0,
+ Schema.OPTIONAL_STRING_SCHEMA,
+ "key",
+ SchemaBuilder.map(Schema.STRING_SCHEMA,
Schema.STRING_SCHEMA)
+ .optional()
+ .build(),
+ null,
+ 1);
+ Assert.assertThrows(
+ "Null valued record from topic topic_test, partition 0 and
offset 1 was failed (the configuration property 'behavior.on.null.values' is
'FAIL').",
+ DataException.class,
+ () -> dorisDefaultSinkService.shouldSkipRecord(record4));
+ }
}
diff --git a/src/test/resources/e2e/string_converter/null_values_default.json
b/src/test/resources/e2e/string_converter/null_values_default.json
new file mode 100644
index 0000000..9ca212f
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/null_values_default.json
@@ -0,0 +1,23 @@
+{
+ "name":"null_values_default_connector",
+ "config":{
+ "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+ "topics":"behavior_on_null_values_test",
+ "tasks.max":"1",
+ "doris.topic2table.map": "behavior_on_null_values_test:null_values_tab",
+ "buffer.count.records":"1",
+ "buffer.flush.time":"10",
+ "buffer.size.bytes":"10000000",
+ "doris.urls":"127.0.0.1",
+ "doris.user":"root",
+ "doris.password":"",
+ "doris.http.port":"8030",
+ "doris.query.port":"9030",
+ "doris.database":"string_msg",
+ "enable.2pc": "false",
+ "load.model":"stream_load",
+ "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+ "value.converter":"org.apache.kafka.connect.storage.StringConverter",
+ "value.converter.schemas.enable": "false"
+ }
+}
\ No newline at end of file
diff --git a/src/test/resources/e2e/string_converter/null_values_fail.json
b/src/test/resources/e2e/string_converter/null_values_fail.json
new file mode 100644
index 0000000..b12628f
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/null_values_fail.json
@@ -0,0 +1,24 @@
+{
+ "name":"null_values_fail_connector",
+ "config":{
+ "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+ "topics":"behavior_on_null_values_test",
+ "tasks.max":"1",
+ "doris.topic2table.map": "behavior_on_null_values_test:null_values_tab",
+ "buffer.count.records":"1",
+ "buffer.flush.time":"10",
+ "buffer.size.bytes":"10000000",
+ "doris.urls":"127.0.0.1",
+ "doris.user":"root",
+ "doris.password":"",
+ "doris.http.port":"8030",
+ "doris.query.port":"9030",
+ "doris.database":"string_msg",
+ "enable.2pc": "false",
+ "load.model":"stream_load",
+ "behavior.on.null.values":"fail",
+ "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+ "value.converter":"org.apache.kafka.connect.storage.StringConverter",
+ "value.converter.schemas.enable": "false"
+ }
+}
\ No newline at end of file
diff --git a/src/test/resources/e2e/string_converter/null_values_ignore.json
b/src/test/resources/e2e/string_converter/null_values_ignore.json
new file mode 100644
index 0000000..05b8e89
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/null_values_ignore.json
@@ -0,0 +1,24 @@
+{
+ "name":"null_values_ignore_connector",
+ "config":{
+ "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+ "topics":"behavior_on_null_values_test",
+ "tasks.max":"1",
+ "doris.topic2table.map": "behavior_on_null_values_test:null_values_tab",
+ "buffer.count.records":"1",
+ "buffer.flush.time":"10",
+ "buffer.size.bytes":"10000000",
+ "doris.urls":"127.0.0.1",
+ "doris.user":"root",
+ "doris.password":"",
+ "doris.http.port":"8030",
+ "doris.query.port":"9030",
+ "doris.database":"string_msg",
+ "enable.2pc": "false",
+ "load.model":"stream_load",
+ "behavior.on.null.values":"ignore",
+ "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+ "value.converter":"org.apache.kafka.connect.storage.StringConverter",
+ "value.converter.schemas.enable": "false"
+ }
+}
\ No newline at end of file
diff --git a/src/test/resources/e2e/string_converter/null_values_tab.sql
b/src/test/resources/e2e/string_converter/null_values_tab.sql
new file mode 100644
index 0000000..22749d5
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/null_values_tab.sql
@@ -0,0 +1,12 @@
+-- Please note that the database here should be consistent with doris.database
in the file where the connector is registered.
+CREATE TABLE string_msg.null_values_tab (
+ id INT NULL,
+ col1 VARCHAR(20) NULL,
+ col2 varchar(20) NULL
+) ENGINE=OLAP
+UNIQUE KEY(`id`)
+COMMENT 'OLAP'
+DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1"
+);
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]