This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new d2a3ac3  [e2e](test) add partial  column update and debezium ingestion 
e2e case  (#38)
d2a3ac3 is described below

commit d2a3ac3479cda9530ff3e9cd56d8973e581b7ea1
Author: Petrichor <31833513+vinle...@users.noreply.github.com>
AuthorDate: Wed Jul 10 10:04:31 2024 +0800

    [e2e](test) add partial  column update and debezium ingestion e2e case  
(#38)
---
 .../e2e/sink/AbstractKafka2DorisSink.java          | 12 +++++
 .../e2e/sink/stringconverter/StringMsgE2ETest.java | 61 ++++++++++++++++++++++
 .../resources/e2e/string_converter/full_types.json | 22 ++++++++
 .../full_types_debezium_ingestion.sql              | 59 +++++++++++++++++++++
 .../string_converter/insert_partial_update_tab.sql |  3 ++
 .../e2e/string_converter/partial_update.json       | 24 +++++++++
 .../e2e/string_converter/partial_update_tab.sql    | 15 ++++++
 7 files changed, 196 insertions(+)

diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
index c4c7fe4..d50556f 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
@@ -108,6 +108,18 @@ public abstract class AbstractKafka2DorisSink {
         LOG.info("Create doris table successfully. sql={}", sql);
     }
 
+    protected void insertTable(String sql) {
+        LOG.info("Will insert data to Doris table. SQL: {}", sql);
+        try {
+            Statement statement = getJdbcConnection().createStatement();
+            int rowCount = statement.executeUpdate(sql);
+            LOG.info("Inserted {} item data into the Doris table.", rowCount);
+        } catch (SQLException e) {
+            throw new DorisException("Failed to insert data to Doris table.", 
e);
+        }
+        LOG.info("Data insertion to Doris table was successful. SQL: {}", sql);
+    }
+
     private static void initDorisBase() {
         if (Objects.nonNull(dorisContainerService)) {
             return;
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
index 227a203..3143461 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
@@ -125,6 +125,65 @@ public class StringMsgE2ETest extends 
AbstractStringE2ESinkTest {
         checkResult(expected, query, 3);
     }
 
+    @Test
+    public void testPartialUpdate() throws Exception {
+        
initialize("src/test/resources/e2e/string_converter/partial_update.json");
+        String topic = "partial_update_test";
+        String msg1 =
+                
"{\"id\":1,\"col1\":\"after_update_col1_1\",\"col2\":\"after_update_col2_1\"}";
+        String msg2 =
+                
"{\"id\":2,\"col1\":\"after_update_col1_2\",\"col2\":\"after_update_col2_2\"}";
+
+        produceMsg2Kafka(topic, msg1);
+        produceMsg2Kafka(topic, msg2);
+
+        String tableSql =
+                
loadContent("src/test/resources/e2e/string_converter/partial_update_tab.sql");
+        String insertSql =
+                loadContent(
+                        
"src/test/resources/e2e/string_converter/insert_partial_update_tab.sql");
+        createTable(tableSql);
+        Thread.sleep(2000);
+        insertTable(insertSql);
+        Thread.sleep(15000);
+        kafkaContainerService.registerKafkaConnector(connectorName, 
jsonMsgConnectorContent);
+
+        String table = dorisOptions.getTopicMapTable(topic);
+        List<String> expected =
+                Arrays.asList(
+                        
"1,after_update_col1_1,after_update_col2_1,before_update_col3_1",
+                        
"2,after_update_col1_2,after_update_col2_2,before_update_col3_2");
+        Thread.sleep(10000);
+        String query =
+                String.format("select id,col1,col2,col3 from %s.%s order by 
id", database, table);
+        checkResult(expected, query, 4);
+    }
+
+    @Test
+    public void testDebeziumIngestionFullTypes() throws Exception {
+        initialize("src/test/resources/e2e/string_converter/full_types.json");
+        String topic = "full_types";
+        String msg1 =
+                
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int64\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"tiny_c\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"tiny_un_c\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"tiny_un_z_c\"},{\"type\":\"int16\",\"optional\":true,\"field\":\"small_c\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"small_un_c\"},{\"type\":\"int32\",\"optio
 [...]
+
+        produceMsg2Kafka(topic, msg1);
+
+        String tableSql =
+                loadContent(
+                        
"src/test/resources/e2e/string_converter/full_types_debezium_ingestion.sql");
+        createTable(tableSql);
+        Thread.sleep(2000);
+        kafkaContainerService.registerKafkaConnector(connectorName, 
jsonMsgConnectorContent);
+
+        String table = dorisOptions.getTopicMapTable(topic);
+        List<String> expected =
+                Arrays.asList(
+                        
"1,127,255,255,32767,65535,65535,8388607,16777215,16777215,2147483647,4294967295,4294967295,2147483647,9223372036854775807,-1,-1,Hello
 
World,abc,123.102,123.102,123.103,123.104,404.4443,404.4444,404.4445,123.4567,123.4568,123.4569,346,34567892.1,false,true,true,2020-07-17,18:00:22,2020-07-17T18:00:22,2020-07-17T18:00:22,text,2021,red,a,b,{\"key1\":\"value1\"},{coordinates=[3,1],
 type=Point, srid=0},{coordinates=[[[1,1],[2,1],[2,2],[1,2],[1,1]]], 
type=Polygon, srid [...]
+        Thread.sleep(10000);
+        String query = String.format("select * from %s.%s order by id", 
database, table);
+        checkResult(expected, query, 51);
+    }
+
     public void checkResult(List<String> expected, String query, int 
columnSize) throws Exception {
         List<String> actual = new ArrayList<>();
 
@@ -143,6 +202,8 @@ public class StringMsgE2ETest extends 
AbstractStringE2ESinkTest {
                 actual.add(StringUtils.join(row, ","));
             }
         }
+        LOG.info("expected result: {}", Arrays.toString(expected.toArray()));
+        LOG.info("actual result: {}", Arrays.toString(actual.toArray()));
         Assert.assertArrayEquals(expected.toArray(), actual.toArray());
     }
 
diff --git a/src/test/resources/e2e/string_converter/full_types.json 
b/src/test/resources/e2e/string_converter/full_types.json
new file mode 100644
index 0000000..ef95aef
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/full_types.json
@@ -0,0 +1,22 @@
+{
+  "name":"mysql_all_types",
+  "config":{
+    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+    "topics":"full_types",
+    "tasks.max":"1",
+    "doris.topic2table.map": "full_types:full_types_tab",
+    "buffer.count.records":"1",
+    "buffer.flush.time":"10",
+    "buffer.size.bytes":"10000000",
+    "doris.urls":"127.0.0.1",
+    "doris.user":"root",
+    "doris.password":"",
+    "doris.http.port":"8030",
+    "doris.query.port":"9030",
+    "doris.database":"debezium_ingestion_msg",
+    "converter.mode": "debezium_ingestion",
+    "load.model":"stream_load",
+    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
+    "value.converter":"org.apache.kafka.connect.json.JsonConverter"
+  }
+}
\ No newline at end of file
diff --git 
a/src/test/resources/e2e/string_converter/full_types_debezium_ingestion.sql 
b/src/test/resources/e2e/string_converter/full_types_debezium_ingestion.sql
new file mode 100644
index 0000000..7cd001c
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/full_types_debezium_ingestion.sql
@@ -0,0 +1,59 @@
+CREATE TABLE debezium_ingestion_msg.full_types_tab
+(
+    `id`                   LARGEINT      NULL,
+    `tiny_c`               TINYINT       NULL,
+    `tiny_un_c`            SMALLINT      NULL,
+    `tiny_un_z_c`          SMALLINT      NULL,
+    `small_c`              SMALLINT      NULL,
+    `small_un_c`           INT           NULL,
+    `small_un_z_c`         INT           NULL,
+    `medium_c`             INT           NULL,
+    `medium_un_c`          BIGINT        NULL,
+    `medium_un_z_c`        BIGINT        NULL,
+    `int_c`                INT           NULL,
+    `int_un_c`             BIGINT        NULL,
+    `int_un_z_c`           BIGINT        NULL,
+    `int11_c`              INT           NULL,
+    `big_c`                BIGINT        NULL,
+    `big_un_c`             LARGEINT      NULL,
+    `big_un_z_c`           LARGEINT      NULL,
+    `varchar_c`            VARCHAR(765)  NULL,
+    `char_c`               VARCHAR(9)    NULL,
+    `real_c`               DOUBLE        NULL,
+    `float_c`              FLOAT         NULL,
+    `float_un_c`           FLOAT         NULL,
+    `float_un_z_c`         FLOAT         NULL,
+    `double_c`             DOUBLE        NULL,
+    `double_un_c`          DOUBLE        NULL,
+    `double_un_z_c`        DOUBLE        NULL,
+    `decimal_c`            DECIMAL(8, 4) NULL,
+    `decimal_un_c`         DECIMAL(8, 4) NULL,
+    `decimal_un_z_c`       DECIMAL(8, 4) NULL,
+    `numeric_c`            DECIMAL(6, 0) NULL,
+    `big_decimal_c`        TEXT          NULL,
+    `bit1_c`               BOOLEAN       NULL,
+    `tiny1_c`              BOOLEAN       NULL,
+    `boolean_c`            BOOLEAN       NULL,
+    `date_c`               DATE          NULL,
+    `time_c`               TEXT          NULL,
+    `datetime_c`           DATETIME      NULL,
+    `timestamp_c`          DATETIME      NULL,
+    `text_c`               TEXT          NULL,
+    `year_c`               INT           NULL,
+    `enum_c`               TEXT          NULL,
+    `set_c`                TEXT          NULL,
+    `json_c`               JSON          NULL,
+    `point_c`              TEXT          NULL,
+    `geometry_c`           TEXT          NULL,
+    `linestring_c`         TEXT          NULL,
+    `polygon_c`            TEXT          NULL,
+    `multipoint_c`         TEXT          NULL,
+    `multiline_c`          TEXT          NULL,
+    `multipolygon_c`       TEXT          NULL,
+    `geometrycollection_c` TEXT          NULL
+)UNIQUE KEY(`id`)
+DISTRIBUTED BY HASH(`id`) BUCKETS 1
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1",
+"light_schema_change" = "true"
+);
\ No newline at end of file
diff --git 
a/src/test/resources/e2e/string_converter/insert_partial_update_tab.sql 
b/src/test/resources/e2e/string_converter/insert_partial_update_tab.sql
new file mode 100644
index 0000000..3dff4dd
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/insert_partial_update_tab.sql
@@ -0,0 +1,3 @@
+insert into string_msg.partial_update_tab (id, col1, col2, col3)
+values (1, "before_update_col1_1", "before_update_col2_1", 
"before_update_col3_1"),
+       (2, "before_update_col1_2", "before_update_col2_2", 
"before_update_col3_2");
\ No newline at end of file
diff --git a/src/test/resources/e2e/string_converter/partial_update.json 
b/src/test/resources/e2e/string_converter/partial_update.json
new file mode 100644
index 0000000..0caaa4b
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/partial_update.json
@@ -0,0 +1,24 @@
+{
+  "name":"partial_update_connector",
+  "config":{
+    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+    "topics":"partial_update_test",
+    "tasks.max":"1",
+    "doris.topic2table.map": "partial_update_test:partial_update_tab",
+    "buffer.count.records":"2",
+    "buffer.flush.time":"10",
+    "buffer.size.bytes":"10000000",
+    "doris.urls":"127.0.0.1",
+    "doris.user":"root",
+    "doris.password":"",
+    "doris.http.port":"8030",
+    "doris.query.port":"9030",
+    "doris.database":"string_msg",
+    "sink.properties.partial_columns":"true",
+    "sink.properties.columns": "id,col1,col2",
+    "enable.2pc": "false",
+    "load.model":"stream_load",
+    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+    "value.converter":"org.apache.kafka.connect.storage.StringConverter"
+  }
+}
\ No newline at end of file
diff --git a/src/test/resources/e2e/string_converter/partial_update_tab.sql 
b/src/test/resources/e2e/string_converter/partial_update_tab.sql
new file mode 100644
index 0000000..64d3d5b
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/partial_update_tab.sql
@@ -0,0 +1,15 @@
+-- Please note that the database here should be consistent with doris.database 
in the file where the connector is registered.
+CREATE TABLE string_msg.partial_update_tab (
+  id INT NULL,
+  col1 VARCHAR(20) NULL,
+  col2 varchar(20) NULL,
+  col3 varchar(20) NUll
+) ENGINE=OLAP
+UNIQUE KEY(`id`)
+COMMENT 'OLAP'
+DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1",
+"light_schema_change"="true",
+"enable_unique_key_merge_on_write" = "true"
+);
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to