This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fbc466e597 [improve](Streamingjob) add macaddr8/xml/hstore and array 
element type  for PostgreSQL  (#61433)
4fbc466e597 is described below

commit 4fbc466e597688cb34aca9fef0d11750c449612e
Author: wudi <[email protected]>
AuthorDate: Fri Mar 20 14:10:47 2026 +0800

    [improve](Streamingjob) add macaddr8/xml/hstore and array element type  for 
PostgreSQL  (#61433)
    
    ### What problem does this PR solve?
    
    Extend PostgreSQL CDC type support:
    
    - **New scalar types**: `macaddr8`, `xml`, `hstore` → mapped to Doris
    `STRING` in both `SchemaChangeHelper` (CDC DDL) and
    `JdbcPostgreSQLClient` (FE table creation)
    - **Array types**: `_*` prefix handling in `SchemaChangeHelper` →
    `ARRAY<T>` (e.g. `_int4` → `ARRAY<INT>`); expanded
    `supportedInnerType` in `JdbcPostgreSQLClient` with
    `numeric`/`json`/`jsonb`/`uuid`
    - **Array element conversion**: extracted `convertToArray()` in
    `DebeziumJsonDeserializer` to recursively convert each
    element through the same type-aware logic as scalar columns, fixing
    timestamp array elements being stored as raw epoch
      microseconds
---
 .../jdbc/client/JdbcPostgreSQLClient.java          |  9 +-
 .../deserialize/DebeziumJsonDeserializer.java      | 15 +++-
 .../doris/cdcclient/utils/SchemaChangeHelper.java  |  8 ++
 .../cdcclient/utils/SchemaChangeHelperTest.java    | 49 +++++++++--
 .../cdc/test_streaming_postgres_job_all_type.out   |  9 +-
 .../test_streaming_postgres_job_array_types.out    | 21 +++++
 .../test_streaming_postgres_job_all_type.groovy    | 10 ++-
 ...test_streaming_postgres_job_array_types.groovy} | 96 +++++++++++-----------
 8 files changed, 157 insertions(+), 60 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcPostgreSQLClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcPostgreSQLClient.java
index 8d449ad33f1..6ee651ad24e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcPostgreSQLClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcPostgreSQLClient.java
@@ -42,8 +42,10 @@ public class JdbcPostgreSQLClient extends JdbcClient {
 
     private static final String[] supportedInnerType = new String[] {
             "int2", "int4", "int8", "smallserial", "serial",
-            "bigserial", "float4", "float8", "timestamp", "timestamptz",
-            "date", "bool", "bpchar", "varchar", "text"
+            "bigserial", "float4", "float8", "numeric",
+            "timestamp", "timestamptz", "date", "bool",
+            "bpchar", "varchar", "text",
+            "json", "jsonb", "uuid"
     };
 
     protected JdbcPostgreSQLClient(JdbcClientConfig jdbcClientConfig) {
@@ -172,8 +174,11 @@ public class JdbcPostgreSQLClient extends JdbcClient {
             case "cidr":
             case "inet":
             case "macaddr":
+            case "macaddr8":
             case "varbit":
             case "uuid":
+            case "xml":
+            case "hstore":
             case "json":
             case "jsonb":
                 return ScalarType.createStringType();
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
index 1ec6da91fa6..25b2b544893 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
@@ -203,10 +203,11 @@ public class DebeziumJsonDeserializer
                 case BOOLEAN:
                     return Boolean.parseBoolean(dbzObj.toString());
                 case STRING:
-                case ARRAY:
                 case MAP:
                 case STRUCT:
                     return dbzObj.toString();
+                case ARRAY:
+                    return convertToArray(fieldSchema, dbzObj);
                 case BYTES:
                     return convertToBinary(dbzObj, fieldSchema);
                 default:
@@ -347,6 +348,18 @@ public class DebeziumJsonDeserializer
         return bigDecimal;
     }
 
+    private Object convertToArray(Schema fieldSchema, Object dbzObj) {
+        if (dbzObj instanceof List) {
+            Schema elementSchema = fieldSchema.valueSchema();
+            List<Object> result = new ArrayList<>();
+            for (Object element : (List<?>) dbzObj) {
+                result.add(element == null ? null : convert(elementSchema, 
element));
+            }
+            return result;
+        }
+        return dbzObj.toString();
+    }
+
     protected Object convertToTime(Object dbzObj, Schema schema) {
         try {
             if (dbzObj instanceof Long) {
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java
index 5eea4f1f16f..aed0bdf7ea6 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java
@@ -208,6 +208,11 @@ public class SchemaChangeHelper {
     /** Map a PostgreSQL native type name to a Doris type string. */
     static String pgTypeNameToDorisType(String pgTypeName, int length, int 
scale) {
         Preconditions.checkNotNull(pgTypeName);
+        // Debezium uses underscore prefix for PostgreSQL array types (_int4, 
_text, etc.)
+        if (pgTypeName.startsWith("_")) {
+            String innerDorisType = 
pgTypeNameToDorisType(pgTypeName.substring(1), length, scale);
+            return String.format("%s<%s>", DorisType.ARRAY, innerDorisType);
+        }
         switch (pgTypeName.toLowerCase()) {
             case "bool":
                 return DorisType.BOOLEAN;
@@ -268,9 +273,12 @@ public class SchemaChangeHelper {
             case "cidr":
             case "inet":
             case "macaddr":
+            case "macaddr8":
             case "varbit":
             case "uuid":
             case "bytea":
+            case "xml":
+            case "hstore":
                 return DorisType.STRING;
             case "json":
             case "jsonb":
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java
index b71fe609d4a..9a80d804cdc 100644
--- 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java
@@ -147,12 +147,19 @@ class SchemaChangeHelperTest {
 
     @Test
     void networkAndMiscTypes_isString() {
-        assertEquals(DorisType.STRING, map("inet", -1, -1));
-        assertEquals(DorisType.STRING, map("cidr", -1, -1));
+        assertEquals(DorisType.STRING, map("inet",    -1, -1));
+        assertEquals(DorisType.STRING, map("cidr",    -1, -1));
         assertEquals(DorisType.STRING, map("macaddr", -1, -1));
-        assertEquals(DorisType.STRING, map("uuid", -1, -1));
-        assertEquals(DorisType.STRING, map("bytea", -1, -1));
-        assertEquals(DorisType.STRING, map("varbit", -1, -1));
+        assertEquals(DorisType.STRING, map("uuid",    -1, -1));
+        assertEquals(DorisType.STRING, map("bytea",   -1, -1));
+        assertEquals(DorisType.STRING, map("varbit",  -1, -1));
+    }
+
+    @Test
+    void macaddr8XmlHstoreTypes_isString() {
+        assertEquals(DorisType.STRING, map("macaddr8", -1, -1));
+        assertEquals(DorisType.STRING, map("xml",      -1, -1));
+        assertEquals(DorisType.STRING, map("hstore",   -1, -1));
     }
 
     @Test
@@ -166,6 +173,38 @@ class SchemaChangeHelperTest {
         assertEquals(DorisType.STRING, map("circle", -1, -1));
     }
 
+    // ─── Array types 
─────────────────────────────────────────────────────────
+
+    @Test
+    void arrayTypes() {
+        // covers the 10 types required by 
test_streaming_postgres_job_array_types
+        assertEquals("ARRAY<SMALLINT>",     map("_int2",        -1, -1));
+        assertEquals("ARRAY<INT>",          map("_int4",        -1, -1));
+        assertEquals("ARRAY<BIGINT>",       map("_int8",        -1, -1));
+        assertEquals("ARRAY<FLOAT>",        map("_float4",      -1, -1));
+        assertEquals("ARRAY<DOUBLE>",       map("_float8",      -1, -1));
+        assertEquals("ARRAY<BOOLEAN>",      map("_bool",        -1, -1));
+        assertEquals("ARRAY<STRING>",       map("_varchar",     -1, -1));
+        assertEquals("ARRAY<STRING>",       map("_text",        -1, -1));
+        assertEquals("ARRAY<DATETIME(6)>",  map("_timestamp",   -1, -1));
+        assertEquals("ARRAY<DATETIME(6)>",  map("_timestamptz", -1, -1));
+        // additional types
+        assertEquals("ARRAY<DATE>",         map("_date",        -1, -1));
+        assertEquals("ARRAY<JSON>",         map("_json",        -1, -1));
+        assertEquals("ARRAY<JSON>",         map("_jsonb",       -1, -1));
+    }
+
+    @Test
+    void arrayType_numeric_defaultPrecisionScale() {
+        assertEquals("ARRAY<DECIMAL(38, 9)>", map("_numeric", 0, -1));
+    }
+
+    @Test
+    void arrayType_nested() {
+        // Two-dimensional array: __int4 → ARRAY<ARRAY<INT>>
+        assertEquals("ARRAY<ARRAY<INT>>", map("__int4", -1, -1));
+    }
+
     // ─── Unknown type fallback 
───────────────────────────────────────────────
 
     @Test
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out
index 878a4a01f72..abdc4038774 100644
--- 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out
@@ -29,11 +29,14 @@ bit_varying_col     text    Yes     false   \N      NONE
 int_array_col  array<int>      Yes     false   \N      NONE
 text_array_col array<text>     Yes     false   \N      NONE
 point_col      text    Yes     false   \N      NONE
+macaddr8_col   text    Yes     false   \N      NONE
+xml_col        text    Yes     false   \N      NONE
+hstore_col     text    Yes     false   \N      NONE
 
 -- !select_all_types_null --
-1      1       100     1000    1.23    4.56    12345.678901    char            
varchar text value      true    2024-01-01      12:00   12:00:00Z       
2024-01-01T12:00        2024-01-01T04:00        P0Y0M1DT0H0M0S  3q2+7w==        
11111111-2222-3333-4444-555555555555    {"a":1} {"b": 2}        192.168.1.1     
192.168.0.0/24  08:00:2b:01:02:03       qg==    Cg==    [1, 2, 3]       ["a", 
"b", "c"] {"coordinates":[1,2],"type":"Point","srid":0}
+1      1       100     1000    1.23    4.56    12345.678901    char            
varchar text value      true    2024-01-01      12:00   12:00:00Z       
2024-01-01T12:00        2024-01-01T04:00        P0Y0M1DT0H0M0S  3q2+7w==        
11111111-2222-3333-4444-555555555555    {"a":1} {"b": 2}        192.168.1.1     
192.168.0.0/24  08:00:2b:01:02:03       qg==    Cg==    [1, 2, 3]       ["a", 
"b", "c"] {"coordinates":[1,2],"type":"Point","srid":0}   
08:00:2b:01:02:03:04:05 <root><item>1</item></root>     {"a":"1","b":"2"}
 
 -- !select_all_types_null2 --
-1      1       100     1000    1.23    4.56    12345.678901    char            
varchar text value      true    2024-01-01      12:00   12:00:00Z       
2024-01-01T12:00        2024-01-01T04:00        P0Y0M1DT0H0M0S  3q2+7w==        
11111111-2222-3333-4444-555555555555    {"a":1} {"b": 2}        192.168.1.1     
192.168.0.0/24  08:00:2b:01:02:03       qg==    Cg==    [1, 2, 3]       ["a", 
"b", "c"] {"coordinates":[1,2],"type":"Point","srid":0}
-2      2       200     2000    7.89    0.12    99999.000001    char2           
varchar2        another text    false   2025-01-01      23:59:59        
23:59:59Z       2025-01-01T23:59:59     2025-01-01T23:59:59     P0Y0M0DT2H0M0S  
3q2+7w==        11111111-2222-3333-4444-555555555556    {"x":10}        {"y": 
20}       10.0.0.1        10.0.0.0/16     08:00:2b:aa:bb:cc       8A==    Dw==  
  [10, 20]        ["x", "y"]      {"coordinates":[3,4],"type":"Point","srid":0}
+1      1       100     1000    1.23    4.56    12345.678901    char            
varchar text value      true    2024-01-01      12:00   12:00:00Z       
2024-01-01T12:00        2024-01-01T04:00        P0Y0M1DT0H0M0S  3q2+7w==        
11111111-2222-3333-4444-555555555555    {"a":1} {"b": 2}        192.168.1.1     
192.168.0.0/24  08:00:2b:01:02:03       qg==    Cg==    [1, 2, 3]       ["a", 
"b", "c"] {"coordinates":[1,2],"type":"Point","srid":0}   
08:00:2b:01:02:03:04:05 <root><item>1</item></root>     {"a":"1","b":"2"}
+2      2       200     2000    7.89    0.12    99999.000001    char2           
varchar2        another text    false   2025-01-01      23:59:59        
23:59:59Z       2025-01-01T23:59:59     2025-01-01T23:59:59     P0Y0M0DT2H0M0S  
3q2+7w==        11111111-2222-3333-4444-555555555556    {"x":10}        {"y": 
20}       10.0.0.1        10.0.0.0/16     08:00:2b:aa:bb:cc       8A==    Dw==  
  [10, 20]        ["x", "y"]      {"coordinates":[3,4],"type":"Point","srid":0} 
  08:00:2b:aa:bb:cc:dd:ee <root><item>2</item></root>     {"x":"10","y":"20"}
 
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_types.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_types.out
new file mode 100644
index 00000000000..4493da5abca
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_types.out
@@ -0,0 +1,21 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !desc_array_types --
+id     bigint  No      true    \N      
+int2_array_col array<smallint> Yes     false   \N      NONE
+int4_array_col array<int>      Yes     false   \N      NONE
+int8_array_col array<bigint>   Yes     false   \N      NONE
+float4_array_col       array<float>    Yes     false   \N      NONE
+double_array_col       array<double>   Yes     false   \N      NONE
+bool_array_col array<boolean>  Yes     false   \N      NONE
+varchar_array_col      array<text>     Yes     false   \N      NONE
+text_array_col array<text>     Yes     false   \N      NONE
+timestamp_array_col    array<datetime(6)>      Yes     false   \N      NONE
+timestamptz_array_col  array<datetime(6)>      Yes     false   \N      NONE
+
+-- !select_array_types --
+1      [1, 2]  [10, 20]        [100, 200]      [1.1, 2.2]      [1.11, 2.22]    
[1, 0]  ["foo", "bar"]  ["hello", "world"]      ["2024-01-01 12:00:00.000000", 
"2024-06-01 00:00:00.000000"]    ["2024-01-01 04:00:00.000000", "2024-06-01 
00:00:00.000000"]
+
+-- !select_array_types2 --
+1      [1, 2]  [10, 20]        [100, 200]      [1.1, 2.2]      [1.11, 2.22]    
[1, 0]  ["foo", "bar"]  ["hello", "world"]      ["2024-01-01 12:00:00.000000", 
"2024-06-01 00:00:00.000000"]    ["2024-01-01 04:00:00.000000", "2024-06-01 
00:00:00.000000"]
+2      [3, 4]  [30, 40]        [300, 400]      [3.3, 4.4]      [3.33, 4.44]    
[0, 1]  ["baz", "qux"]  ["foo", "bar"]  ["2025-01-01 06:00:00.000000", 
"2025-06-01 18:00:00.000000"]    ["2024-12-31 22:00:00.000000", "2025-06-01 
18:00:00.000000"]
+
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy
index 57666ddf5db..9ec4ab3a52e 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy
@@ -43,6 +43,7 @@ suite("test_streaming_postgres_job_all_type", 
"p0,external,pg,external_docker,ex
         // create test
         connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
             // sql """CREATE SCHEMA IF NOT EXISTS ${pgSchema}"""
+            sql """CREATE EXTENSION IF NOT EXISTS hstore"""
             sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
             sql """
             create table ${pgDB}.${pgSchema}.${table1} (
@@ -74,12 +75,15 @@ suite("test_streaming_postgres_job_all_type", 
"p0,external,pg,external_docker,ex
                 bit_varying_col     bit varying(16),
                 int_array_col       integer[],
                 text_array_col      text[],
-                point_col           point
+                point_col           point,
+                macaddr8_col        macaddr8,
+                xml_col             xml,
+                hstore_col          hstore
             );
             """
             // mock snapshot data
             sql """
-            INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES 
(1,1,100,1000,1.23,4.56,12345.678901,'char','varchar','text 
value',true,'2024-01-01','12:00:00','12:00:00+08','2024-01-01 
12:00:00','2024-01-01 12:00:00+08','1 day',decode('DEADBEEF', 
'hex'),'11111111-2222-3333-4444-555555555555'::uuid,'{"a":1}','{"b":2}','192.168.1.1','192.168.0.0/24','08:00:2b:01:02:03',B'10101010',B'1010',ARRAY[1,2,3],ARRAY['a','b','c'],'(1,2)');
+            INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES 
(1,1,100,1000,1.23,4.56,12345.678901,'char','varchar','text 
value',true,'2024-01-01','12:00:00','12:00:00+08','2024-01-01 
12:00:00','2024-01-01 12:00:00+08','1 day',decode('DEADBEEF', 
'hex'),'11111111-2222-3333-4444-555555555555'::uuid,'{"a":1}','{"b":2}','192.168.1.1','192.168.0.0/24','08:00:2b:01:02:03',B'10101010',B'1010',ARRAY[1,2,3],ARRAY['a','b','c'],'(1,2)','08:00:2b:01:02:03:04:05'::macaddr8,'<root><item>1</item></root>
 [...]
             """
         }
 
@@ -125,7 +129,7 @@ suite("test_streaming_postgres_job_all_type", 
"p0,external,pg,external_docker,ex
 
         // mock incremental into
         connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
-            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES 
(2,2,200,2000,7.89,0.12,99999.000001,'char2','varchar2','another 
text',false,'2025-01-01','23:59:59','23:59:59+00','2025-01-01 
23:59:59','2025-01-01 23:59:59+00','2 hours',decode('DEADBEEF', 
'hex'),'11111111-2222-3333-4444-555555555556'::uuid,'{"x":10}','{"y":20}','10.0.0.1','10.0.0.0/16','08:00:2b:aa:bb:cc',B'11110000',B'1111',ARRAY[10,20],ARRAY['x','y'],'(3,4)');"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES 
(2,2,200,2000,7.89,0.12,99999.000001,'char2','varchar2','another 
text',false,'2025-01-01','23:59:59','23:59:59+00','2025-01-01 
23:59:59','2025-01-01 23:59:59+00','2 hours',decode('DEADBEEF', 
'hex'),'11111111-2222-3333-4444-555555555556'::uuid,'{"x":10}','{"y":20}','10.0.0.1','10.0.0.0/16','08:00:2b:aa:bb:cc',B'11110000',B'1111',ARRAY[10,20],ARRAY['x','y'],'(3,4)','08:00:2b:aa:bb:cc:dd:ee'::macaddr8,'<root><item>2</item><
 [...]
         }
 
         sleep(60000); // wait for cdc incremental data
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_types.groovy
similarity index 55%
copy from 
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy
copy to 
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_types.groovy
index 57666ddf5db..3ae65b118d3 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_types.groovy
@@ -20,10 +20,10 @@ import org.awaitility.Awaitility
 
 import static java.util.concurrent.TimeUnit.SECONDS
 
-suite("test_streaming_postgres_job_all_type", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
-    def jobName = "test_streaming_postgres_job_all_type_name"
+suite("test_streaming_postgres_job_array_types", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_postgres_job_array_types_name"
     def currentDb = (sql "select database()")[0][0]
-    def table1 = "streaming_all_types_nullable_with_pk_pg"
+    def table1 = "streaming_array_types_pg"
     def pgDB = "postgres"
     def pgSchema = "cdc_test"
     def pgUser = "postgres"
@@ -40,46 +40,38 @@ suite("test_streaming_postgres_job_all_type", 
"p0,external,pg,external_docker,ex
         String bucket = getS3BucketName()
         String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
 
-        // create test
         connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
-            // sql """CREATE SCHEMA IF NOT EXISTS ${pgSchema}"""
             sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
             sql """
-            create table ${pgDB}.${pgSchema}.${table1} (
-                id                  bigserial PRIMARY KEY,
-                smallint_col        smallint,
-                integer_col         integer,
-                bigint_col          bigint,
-                real_col            real,
-                double_col          double precision,
-                numeric_col         numeric(20,6),
-                char_col            char(10),
-                varchar_col         varchar(255),
-                text_col            text,
-                boolean_col         boolean,
-                date_col            date,
-                time_col            time,
-                timetz_col          time with time zone,
-                timestamp_col       timestamp,
-                timestamptz_col     timestamp with time zone,
-                interval_col        interval,
-                bytea_col           bytea,
-                uuid_col            uuid,
-                json_col            json,
-                jsonb_col           jsonb,
-                inet_col            inet,
-                cidr_col            cidr,
-                macaddr_col         macaddr,
-                bit_col             bit(8),
-                bit_varying_col     bit varying(16),
-                int_array_col       integer[],
-                text_array_col      text[],
-                point_col           point
+            CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+                id                      bigserial PRIMARY KEY,
+                int2_array_col          int2[],
+                int4_array_col          int4[],
+                int8_array_col          int8[],
+                float4_array_col        float4[],
+                double_array_col        double precision[],
+                bool_array_col          bool[],
+                varchar_array_col       varchar(50)[],
+                text_array_col          text[],
+                timestamp_array_col     timestamp[],
+                timestamptz_array_col   timestamptz[]
             );
             """
             // mock snapshot data
             sql """
-            INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES 
(1,1,100,1000,1.23,4.56,12345.678901,'char','varchar','text 
value',true,'2024-01-01','12:00:00','12:00:00+08','2024-01-01 
12:00:00','2024-01-01 12:00:00+08','1 day',decode('DEADBEEF', 
'hex'),'11111111-2222-3333-4444-555555555555'::uuid,'{"a":1}','{"b":2}','192.168.1.1','192.168.0.0/24','08:00:2b:01:02:03',B'10101010',B'1010',ARRAY[1,2,3],ARRAY['a','b','c'],'(1,2)');
+            INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (
+                1,
+                ARRAY[1::int2, 2::int2],
+                ARRAY[10::int4, 20::int4],
+                ARRAY[100::int8, 200::int8],
+                ARRAY[1.1::float4, 2.2::float4],
+                ARRAY[1.11::double precision, 2.22::double precision],
+                ARRAY[true, false],
+                ARRAY['foo'::varchar, 'bar'::varchar],
+                ARRAY['hello', 'world'],
+                ARRAY['2024-01-01 12:00:00'::timestamp, '2024-06-01 
00:00:00'::timestamp],
+                ARRAY['2024-01-01 12:00:00+08'::timestamptz, '2024-06-01 
00:00:00+00'::timestamptz]
+            );
             """
         }
 
@@ -93,7 +85,7 @@ suite("test_streaming_postgres_job_all_type", 
"p0,external,pg,external_docker,ex
                     "password" = "${pgPassword}",
                     "database" = "${pgDB}",
                     "schema" = "${pgSchema}",
-                    "include_tables" = "${table1}", 
+                    "include_tables" = "${table1}",
                     "offset" = "initial"
                 )
                 TO DATABASE ${currentDb} (
@@ -108,7 +100,6 @@ suite("test_streaming_postgres_job_all_type", 
"p0,external,pg,external_docker,ex
                     {
                         def jobSuccendCount = sql """ select SucceedTaskCount 
from jobs("type"="insert") where Name = '${jobName}' and 
ExecuteType='STREAMING' """
                         log.info("jobSuccendCount: " + jobSuccendCount)
-                        // check job status and succeed task count larger than 
1
                         jobSuccendCount.size() == 1 && '1' <= 
jobSuccendCount.get(0).get(0)
                     }
             )
@@ -120,24 +111,37 @@ suite("test_streaming_postgres_job_all_type", 
"p0,external,pg,external_docker,ex
             throw ex;
         }
 
-        qt_desc_all_types_null """desc ${currentDb}.${table1};"""
-        qt_select_all_types_null """select * from ${currentDb}.${table1} order 
by 1;"""
+        qt_desc_array_types """desc ${currentDb}.${table1};"""
+        qt_select_array_types """select * from ${currentDb}.${table1} order by 
1;"""
 
-        // mock incremental into
+        // mock incremental data
         connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
-            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES 
(2,2,200,2000,7.89,0.12,99999.000001,'char2','varchar2','another 
text',false,'2025-01-01','23:59:59','23:59:59+00','2025-01-01 
23:59:59','2025-01-01 23:59:59+00','2 hours',decode('DEADBEEF', 
'hex'),'11111111-2222-3333-4444-555555555556'::uuid,'{"x":10}','{"y":20}','10.0.0.1','10.0.0.0/16','08:00:2b:aa:bb:cc',B'11110000',B'1111',ARRAY[10,20],ARRAY['x','y'],'(3,4)');"""
+            sql """
+            INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (
+                2,
+                ARRAY[3::int2, 4::int2],
+                ARRAY[30::int4, 40::int4],
+                ARRAY[300::int8, 400::int8],
+                ARRAY[3.3::float4, 4.4::float4],
+                ARRAY[3.33::double precision, 4.44::double precision],
+                ARRAY[false, true],
+                ARRAY['baz'::varchar, 'qux'::varchar],
+                ARRAY['foo', 'bar'],
+                ARRAY['2025-01-01 06:00:00'::timestamp, '2025-06-01 
18:00:00'::timestamp],
+                ARRAY['2025-01-01 06:00:00+08'::timestamptz, '2025-06-01 
18:00:00+00'::timestamptz]
+            );
+            """
         }
 
         sleep(60000); // wait for cdc incremental data
 
-        // check incremental data
-        qt_select_all_types_null2 """select * from ${currentDb}.${table1} 
order by 1;"""
+        qt_select_array_types2 """select * from ${currentDb}.${table1} order 
by 1;"""
 
         sql """
-            DROP JOB IF EXISTS where jobname =  '${jobName}'
+            DROP JOB IF EXISTS where jobname = '${jobName}'
         """
 
-        def jobCountRsp = sql """select count(1) from jobs("type"="insert")  
where Name ='${jobName}'"""
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert") 
where Name ='${jobName}'"""
         assert jobCountRsp.get(0).get(0) == 0
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to