This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new d2a3ac3 [e2e](test) add partial column update and debezium ingestion e2e case (#38) d2a3ac3 is described below commit d2a3ac3479cda9530ff3e9cd56d8973e581b7ea1 Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Wed Jul 10 10:04:31 2024 +0800 [e2e](test) add partial column update and debezium ingestion e2e case (#38) --- .../e2e/sink/AbstractKafka2DorisSink.java | 12 +++++ .../e2e/sink/stringconverter/StringMsgE2ETest.java | 61 ++++++++++++++++++++++ .../resources/e2e/string_converter/full_types.json | 22 ++++++++ .../full_types_debezium_ingestion.sql | 59 +++++++++++++++++++++ .../string_converter/insert_partial_update_tab.sql | 3 ++ .../e2e/string_converter/partial_update.json | 24 +++++++++ .../e2e/string_converter/partial_update_tab.sql | 15 ++++++ 7 files changed, 196 insertions(+) diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java index c4c7fe4..d50556f 100644 --- a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java @@ -108,6 +108,18 @@ public abstract class AbstractKafka2DorisSink { LOG.info("Create doris table successfully. sql={}", sql); } + protected void insertTable(String sql) { + LOG.info("Will insert data to Doris table. SQL: {}", sql); + try { + Statement statement = getJdbcConnection().createStatement(); + int rowCount = statement.executeUpdate(sql); + LOG.info("Inserted {} item data into the Doris table.", rowCount); + } catch (SQLException e) { + throw new DorisException("Failed to insert data to Doris table.", e); + } + LOG.info("Data insertion to Doris table was successful. SQL: {}", sql); + } + private static void initDorisBase() { if (Objects.nonNull(dorisContainerService)) { return; diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java index 227a203..3143461 100644 --- a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java @@ -125,6 +125,65 @@ public class StringMsgE2ETest extends AbstractStringE2ESinkTest { checkResult(expected, query, 3); } + @Test + public void testPartialUpdate() throws Exception { + initialize("src/test/resources/e2e/string_converter/partial_update.json"); + String topic = "partial_update_test"; + String msg1 = + "{\"id\":1,\"col1\":\"after_update_col1_1\",\"col2\":\"after_update_col2_1\"}"; + String msg2 = + "{\"id\":2,\"col1\":\"after_update_col1_2\",\"col2\":\"after_update_col2_2\"}"; + + produceMsg2Kafka(topic, msg1); + produceMsg2Kafka(topic, msg2); + + String tableSql = + loadContent("src/test/resources/e2e/string_converter/partial_update_tab.sql"); + String insertSql = + loadContent( + "src/test/resources/e2e/string_converter/insert_partial_update_tab.sql"); + createTable(tableSql); + Thread.sleep(2000); + insertTable(insertSql); + Thread.sleep(15000); + kafkaContainerService.registerKafkaConnector(connectorName, jsonMsgConnectorContent); + + String table = dorisOptions.getTopicMapTable(topic); + List<String> expected = + Arrays.asList( + "1,after_update_col1_1,after_update_col2_1,before_update_col3_1", + "2,after_update_col1_2,after_update_col2_2,before_update_col3_2"); + Thread.sleep(10000); + String query = + String.format("select id,col1,col2,col3 from %s.%s order by id", database, table); + checkResult(expected, query, 4); + } + + @Test + public void testDebeziumIngestionFullTypes() throws Exception { + initialize("src/test/resources/e2e/string_converter/full_types.json"); + String topic = "full_types"; + String msg1 = + "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int64\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"tiny_c\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"tiny_un_c\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"tiny_un_z_c\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"small_c\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"small_un_c\"},{\"type\":\"int32\",\"optio [...] + + produceMsg2Kafka(topic, msg1); + + String tableSql = + loadContent( + "src/test/resources/e2e/string_converter/full_types_debezium_ingestion.sql"); + createTable(tableSql); + Thread.sleep(2000); + kafkaContainerService.registerKafkaConnector(connectorName, jsonMsgConnectorContent); + + String table = dorisOptions.getTopicMapTable(topic); + List<String> expected = + Arrays.asList( + "1,127,255,255,32767,65535,65535,8388607,16777215,16777215,2147483647,4294967295,4294967295,2147483647,9223372036854775807,-1,-1,Hello World,abc,123.102,123.102,123.103,123.104,404.4443,404.4444,404.4445,123.4567,123.4568,123.4569,346,34567892.1,false,true,true,2020-07-17,18:00:22,2020-07-17T18:00:22,2020-07-17T18:00:22,text,2021,red,a,b,{\"key1\":\"value1\"},{coordinates=[3,1], type=Point, srid=0},{coordinates=[[[1,1],[2,1],[2,2],[1,2],[1,1]]], type=Polygon, srid [...] + Thread.sleep(10000); + String query = String.format("select * from %s.%s order by id", database, table); + checkResult(expected, query, 51); + } + public void checkResult(List<String> expected, String query, int columnSize) throws Exception { List<String> actual = new ArrayList<>(); @@ -143,6 +202,8 @@ public class StringMsgE2ETest extends AbstractStringE2ESinkTest { actual.add(StringUtils.join(row, ",")); } } + LOG.info("expected result: {}", Arrays.toString(expected.toArray())); + LOG.info("actual result: {}", Arrays.toString(actual.toArray())); Assert.assertArrayEquals(expected.toArray(), actual.toArray()); } diff --git a/src/test/resources/e2e/string_converter/full_types.json b/src/test/resources/e2e/string_converter/full_types.json new file mode 100644 index 0000000..ef95aef --- /dev/null +++ b/src/test/resources/e2e/string_converter/full_types.json @@ -0,0 +1,22 @@ +{ + "name":"mysql_all_types", + "config":{ + "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector", + "topics":"full_types", + "tasks.max":"1", + "doris.topic2table.map": "full_types:full_types_tab", + "buffer.count.records":"1", + "buffer.flush.time":"10", + "buffer.size.bytes":"10000000", + "doris.urls":"127.0.0.1", + "doris.user":"root", + "doris.password":"", + "doris.http.port":"8030", + "doris.query.port":"9030", + "doris.database":"debezium_ingestion_msg", + "converter.mode": "debezium_ingestion", + "load.model":"stream_load", + "key.converter":"org.apache.kafka.connect.json.JsonConverter", + "value.converter":"org.apache.kafka.connect.json.JsonConverter" + } +} \ No newline at end of file diff --git a/src/test/resources/e2e/string_converter/full_types_debezium_ingestion.sql b/src/test/resources/e2e/string_converter/full_types_debezium_ingestion.sql new file mode 100644 index 0000000..7cd001c --- /dev/null +++ b/src/test/resources/e2e/string_converter/full_types_debezium_ingestion.sql @@ -0,0 +1,59 @@ +CREATE TABLE debezium_ingestion_msg.full_types_tab +( + `id` LARGEINT NULL, + `tiny_c` TINYINT NULL, + `tiny_un_c` SMALLINT NULL, + `tiny_un_z_c` SMALLINT NULL, + `small_c` SMALLINT NULL, + `small_un_c` INT NULL, + `small_un_z_c` INT NULL, + `medium_c` INT NULL, + `medium_un_c` BIGINT NULL, + `medium_un_z_c` BIGINT NULL, + `int_c` INT NULL, + `int_un_c` BIGINT NULL, + `int_un_z_c` BIGINT NULL, + `int11_c` INT NULL, + `big_c` BIGINT NULL, + `big_un_c` LARGEINT NULL, + `big_un_z_c` LARGEINT NULL, + `varchar_c` VARCHAR(765) NULL, + `char_c` VARCHAR(9) NULL, + `real_c` DOUBLE NULL, + `float_c` FLOAT NULL, + `float_un_c` FLOAT NULL, + `float_un_z_c` FLOAT NULL, + `double_c` DOUBLE NULL, + `double_un_c` DOUBLE NULL, + `double_un_z_c` DOUBLE NULL, + `decimal_c` DECIMAL(8, 4) NULL, + `decimal_un_c` DECIMAL(8, 4) NULL, + `decimal_un_z_c` DECIMAL(8, 4) NULL, + `numeric_c` DECIMAL(6, 0) NULL, + `big_decimal_c` TEXT NULL, + `bit1_c` BOOLEAN NULL, + `tiny1_c` BOOLEAN NULL, + `boolean_c` BOOLEAN NULL, + `date_c` DATE NULL, + `time_c` TEXT NULL, + `datetime_c` DATETIME NULL, + `timestamp_c` DATETIME NULL, + `text_c` TEXT NULL, + `year_c` INT NULL, + `enum_c` TEXT NULL, + `set_c` TEXT NULL, + `json_c` JSON NULL, + `point_c` TEXT NULL, + `geometry_c` TEXT NULL, + `linestring_c` TEXT NULL, + `polygon_c` TEXT NULL, + `multipoint_c` TEXT NULL, + `multiline_c` TEXT NULL, + `multipolygon_c` TEXT NULL, + `geometrycollection_c` TEXT NULL +)UNIQUE KEY(`id`) +DISTRIBUTED BY HASH(`id`) BUCKETS 1 +PROPERTIES ( +"replication_allocation" = "tag.location.default: 1", +"light_schema_change" = "true" +); \ No newline at end of file diff --git a/src/test/resources/e2e/string_converter/insert_partial_update_tab.sql b/src/test/resources/e2e/string_converter/insert_partial_update_tab.sql new file mode 100644 index 0000000..3dff4dd --- /dev/null +++ b/src/test/resources/e2e/string_converter/insert_partial_update_tab.sql @@ -0,0 +1,3 @@ +insert into string_msg.partial_update_tab (id, col1, col2, col3) +values (1, "before_update_col1_1", "before_update_col2_1", "before_update_col3_1"), + (2, "before_update_col1_2", "before_update_col2_2", "before_update_col3_2"); \ No newline at end of file diff --git a/src/test/resources/e2e/string_converter/partial_update.json b/src/test/resources/e2e/string_converter/partial_update.json new file mode 100644 index 0000000..0caaa4b --- /dev/null +++ b/src/test/resources/e2e/string_converter/partial_update.json @@ -0,0 +1,24 @@ +{ + "name":"partial_update_connector", + "config":{ + "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector", + "topics":"partial_update_test", + "tasks.max":"1", + "doris.topic2table.map": "partial_update_test:partial_update_tab", + "buffer.count.records":"2", + "buffer.flush.time":"10", + "buffer.size.bytes":"10000000", + "doris.urls":"127.0.0.1", + "doris.user":"root", + "doris.password":"", + "doris.http.port":"8030", + "doris.query.port":"9030", + "doris.database":"string_msg", + "sink.properties.partial_columns":"true", + "sink.properties.columns": "id,col1,col2", + "enable.2pc": "false", + "load.model":"stream_load", + "key.converter":"org.apache.kafka.connect.storage.StringConverter", + "value.converter":"org.apache.kafka.connect.storage.StringConverter" + } +} \ No newline at end of file diff --git a/src/test/resources/e2e/string_converter/partial_update_tab.sql b/src/test/resources/e2e/string_converter/partial_update_tab.sql new file mode 100644 index 0000000..64d3d5b --- /dev/null +++ b/src/test/resources/e2e/string_converter/partial_update_tab.sql @@ -0,0 +1,15 @@ +-- Please note that the database here should be consistent with doris.database in the file where the connector is registered. +CREATE TABLE string_msg.partial_update_tab ( + id INT NULL, + col1 VARCHAR(20) NULL, + col2 varchar(20) NULL, + col3 varchar(20) NUll +) ENGINE=OLAP +UNIQUE KEY(`id`) +COMMENT 'OLAP' +DISTRIBUTED BY HASH(`id`) BUCKETS AUTO +PROPERTIES ( +"replication_allocation" = "tag.location.default: 1", +"light_schema_change"="true", +"enable_unique_key_merge_on_write" = "true" +); \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org