This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4fbc466e597 [improve](Streamingjob) add macaddr8/xml/hstore and array
element type for PostgreSQL (#61433)
4fbc466e597 is described below
commit 4fbc466e597688cb34aca9fef0d11750c449612e
Author: wudi <[email protected]>
AuthorDate: Fri Mar 20 14:10:47 2026 +0800
[improve](Streamingjob) add macaddr8/xml/hstore and array element type for
PostgreSQL (#61433)
### What problem does this PR solve?
Extend PostgreSQL CDC type support:
- **New scalar types**: `macaddr8`, `xml`, `hstore` → mapped to Doris
`STRING` in both `SchemaChangeHelper` (CDC DDL) and
`JdbcPostgreSQLClient` (FE table creation)
- **Array types**: `_*` prefix handling in `SchemaChangeHelper` →
`ARRAY<T>` (e.g. `_int4` → `ARRAY<INT>`); expanded
`supportedInnerType` in `JdbcPostgreSQLClient` with
`numeric`/`json`/`jsonb`/`uuid`
- **Array element conversion**: extracted `convertToArray()` in
`DebeziumJsonDeserializer` to recursively convert each
element through the same type-aware logic as scalar columns, fixing
timestamp array elements being stored as raw epoch
microseconds
---
.../jdbc/client/JdbcPostgreSQLClient.java | 9 +-
.../deserialize/DebeziumJsonDeserializer.java | 15 +++-
.../doris/cdcclient/utils/SchemaChangeHelper.java | 8 ++
.../cdcclient/utils/SchemaChangeHelperTest.java | 49 +++++++++--
.../cdc/test_streaming_postgres_job_all_type.out | 9 +-
.../test_streaming_postgres_job_array_types.out | 21 +++++
.../test_streaming_postgres_job_all_type.groovy | 10 ++-
...test_streaming_postgres_job_array_types.groovy} | 96 +++++++++++-----------
8 files changed, 157 insertions(+), 60 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcPostgreSQLClient.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcPostgreSQLClient.java
index 8d449ad33f1..6ee651ad24e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcPostgreSQLClient.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcPostgreSQLClient.java
@@ -42,8 +42,10 @@ public class JdbcPostgreSQLClient extends JdbcClient {
private static final String[] supportedInnerType = new String[] {
"int2", "int4", "int8", "smallserial", "serial",
- "bigserial", "float4", "float8", "timestamp", "timestamptz",
- "date", "bool", "bpchar", "varchar", "text"
+ "bigserial", "float4", "float8", "numeric",
+ "timestamp", "timestamptz", "date", "bool",
+ "bpchar", "varchar", "text",
+ "json", "jsonb", "uuid"
};
protected JdbcPostgreSQLClient(JdbcClientConfig jdbcClientConfig) {
@@ -172,8 +174,11 @@ public class JdbcPostgreSQLClient extends JdbcClient {
case "cidr":
case "inet":
case "macaddr":
+ case "macaddr8":
case "varbit":
case "uuid":
+ case "xml":
+ case "hstore":
case "json":
case "jsonb":
return ScalarType.createStringType();
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
index 1ec6da91fa6..25b2b544893 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
@@ -203,10 +203,11 @@ public class DebeziumJsonDeserializer
case BOOLEAN:
return Boolean.parseBoolean(dbzObj.toString());
case STRING:
- case ARRAY:
case MAP:
case STRUCT:
return dbzObj.toString();
+ case ARRAY:
+ return convertToArray(fieldSchema, dbzObj);
case BYTES:
return convertToBinary(dbzObj, fieldSchema);
default:
@@ -347,6 +348,18 @@ public class DebeziumJsonDeserializer
return bigDecimal;
}
+ private Object convertToArray(Schema fieldSchema, Object dbzObj) {
+ if (dbzObj instanceof List) {
+ Schema elementSchema = fieldSchema.valueSchema();
+ List<Object> result = new ArrayList<>();
+ for (Object element : (List<?>) dbzObj) {
+ result.add(element == null ? null : convert(elementSchema,
element));
+ }
+ return result;
+ }
+ return dbzObj.toString();
+ }
+
protected Object convertToTime(Object dbzObj, Schema schema) {
try {
if (dbzObj instanceof Long) {
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java
index 5eea4f1f16f..aed0bdf7ea6 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java
@@ -208,6 +208,11 @@ public class SchemaChangeHelper {
/** Map a PostgreSQL native type name to a Doris type string. */
static String pgTypeNameToDorisType(String pgTypeName, int length, int
scale) {
Preconditions.checkNotNull(pgTypeName);
+ // Debezium uses underscore prefix for PostgreSQL array types (_int4,
_text, etc.)
+ if (pgTypeName.startsWith("_")) {
+ String innerDorisType =
pgTypeNameToDorisType(pgTypeName.substring(1), length, scale);
+ return String.format("%s<%s>", DorisType.ARRAY, innerDorisType);
+ }
switch (pgTypeName.toLowerCase()) {
case "bool":
return DorisType.BOOLEAN;
@@ -268,9 +273,12 @@ public class SchemaChangeHelper {
case "cidr":
case "inet":
case "macaddr":
+ case "macaddr8":
case "varbit":
case "uuid":
case "bytea":
+ case "xml":
+ case "hstore":
return DorisType.STRING;
case "json":
case "jsonb":
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java
index b71fe609d4a..9a80d804cdc 100644
---
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java
@@ -147,12 +147,19 @@ class SchemaChangeHelperTest {
@Test
void networkAndMiscTypes_isString() {
- assertEquals(DorisType.STRING, map("inet", -1, -1));
- assertEquals(DorisType.STRING, map("cidr", -1, -1));
+ assertEquals(DorisType.STRING, map("inet", -1, -1));
+ assertEquals(DorisType.STRING, map("cidr", -1, -1));
assertEquals(DorisType.STRING, map("macaddr", -1, -1));
- assertEquals(DorisType.STRING, map("uuid", -1, -1));
- assertEquals(DorisType.STRING, map("bytea", -1, -1));
- assertEquals(DorisType.STRING, map("varbit", -1, -1));
+ assertEquals(DorisType.STRING, map("uuid", -1, -1));
+ assertEquals(DorisType.STRING, map("bytea", -1, -1));
+ assertEquals(DorisType.STRING, map("varbit", -1, -1));
+ }
+
+ @Test
+ void macaddr8XmlHstoreTypes_isString() {
+ assertEquals(DorisType.STRING, map("macaddr8", -1, -1));
+ assertEquals(DorisType.STRING, map("xml", -1, -1));
+ assertEquals(DorisType.STRING, map("hstore", -1, -1));
}
@Test
@@ -166,6 +173,38 @@ class SchemaChangeHelperTest {
assertEquals(DorisType.STRING, map("circle", -1, -1));
}
+ // ─── Array types
─────────────────────────────────────────────────────────
+
+ @Test
+ void arrayTypes() {
+ // covers the 10 types required by
test_streaming_postgres_job_array_types
+ assertEquals("ARRAY<SMALLINT>", map("_int2", -1, -1));
+ assertEquals("ARRAY<INT>", map("_int4", -1, -1));
+ assertEquals("ARRAY<BIGINT>", map("_int8", -1, -1));
+ assertEquals("ARRAY<FLOAT>", map("_float4", -1, -1));
+ assertEquals("ARRAY<DOUBLE>", map("_float8", -1, -1));
+ assertEquals("ARRAY<BOOLEAN>", map("_bool", -1, -1));
+ assertEquals("ARRAY<STRING>", map("_varchar", -1, -1));
+ assertEquals("ARRAY<STRING>", map("_text", -1, -1));
+ assertEquals("ARRAY<DATETIME(6)>", map("_timestamp", -1, -1));
+ assertEquals("ARRAY<DATETIME(6)>", map("_timestamptz", -1, -1));
+ // additional types
+ assertEquals("ARRAY<DATE>", map("_date", -1, -1));
+ assertEquals("ARRAY<JSON>", map("_json", -1, -1));
+ assertEquals("ARRAY<JSON>", map("_jsonb", -1, -1));
+ }
+
+ @Test
+ void arrayType_numeric_defaultPrecisionScale() {
+ assertEquals("ARRAY<DECIMAL(38, 9)>", map("_numeric", 0, -1));
+ }
+
+ @Test
+ void arrayType_nested() {
+ // Two-dimensional array: __int4 → ARRAY<ARRAY<INT>>
+ assertEquals("ARRAY<ARRAY<INT>>", map("__int4", -1, -1));
+ }
+
// ─── Unknown type fallback
───────────────────────────────────────────────
@Test
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out
index 878a4a01f72..abdc4038774 100644
---
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out
@@ -29,11 +29,14 @@ bit_varying_col text Yes false \N NONE
int_array_col array<int> Yes false \N NONE
text_array_col array<text> Yes false \N NONE
point_col text Yes false \N NONE
+macaddr8_col text Yes false \N NONE
+xml_col text Yes false \N NONE
+hstore_col text Yes false \N NONE
-- !select_all_types_null --
-1 1 100 1000 1.23 4.56 12345.678901 char
varchar text value true 2024-01-01 12:00 12:00:00Z
2024-01-01T12:00 2024-01-01T04:00 P0Y0M1DT0H0M0S 3q2+7w==
11111111-2222-3333-4444-555555555555 {"a":1} {"b": 2} 192.168.1.1
192.168.0.0/24 08:00:2b:01:02:03 qg== Cg== [1, 2, 3] ["a",
"b", "c"] {"coordinates":[1,2],"type":"Point","srid":0}
+1 1 100 1000 1.23 4.56 12345.678901 char
varchar text value true 2024-01-01 12:00 12:00:00Z
2024-01-01T12:00 2024-01-01T04:00 P0Y0M1DT0H0M0S 3q2+7w==
11111111-2222-3333-4444-555555555555 {"a":1} {"b": 2} 192.168.1.1
192.168.0.0/24 08:00:2b:01:02:03 qg== Cg== [1, 2, 3] ["a",
"b", "c"] {"coordinates":[1,2],"type":"Point","srid":0}
08:00:2b:01:02:03:04:05 <root><item>1</item></root> {"a":"1","b":"2"}
-- !select_all_types_null2 --
-1 1 100 1000 1.23 4.56 12345.678901 char
varchar text value true 2024-01-01 12:00 12:00:00Z
2024-01-01T12:00 2024-01-01T04:00 P0Y0M1DT0H0M0S 3q2+7w==
11111111-2222-3333-4444-555555555555 {"a":1} {"b": 2} 192.168.1.1
192.168.0.0/24 08:00:2b:01:02:03 qg== Cg== [1, 2, 3] ["a",
"b", "c"] {"coordinates":[1,2],"type":"Point","srid":0}
-2 2 200 2000 7.89 0.12 99999.000001 char2
varchar2 another text false 2025-01-01 23:59:59
23:59:59Z 2025-01-01T23:59:59 2025-01-01T23:59:59 P0Y0M0DT2H0M0S
3q2+7w== 11111111-2222-3333-4444-555555555556 {"x":10} {"y":
20} 10.0.0.1 10.0.0.0/16 08:00:2b:aa:bb:cc 8A== Dw==
[10, 20] ["x", "y"] {"coordinates":[3,4],"type":"Point","srid":0}
+1 1 100 1000 1.23 4.56 12345.678901 char
varchar text value true 2024-01-01 12:00 12:00:00Z
2024-01-01T12:00 2024-01-01T04:00 P0Y0M1DT0H0M0S 3q2+7w==
11111111-2222-3333-4444-555555555555 {"a":1} {"b": 2} 192.168.1.1
192.168.0.0/24 08:00:2b:01:02:03 qg== Cg== [1, 2, 3] ["a",
"b", "c"] {"coordinates":[1,2],"type":"Point","srid":0}
08:00:2b:01:02:03:04:05 <root><item>1</item></root> {"a":"1","b":"2"}
+2 2 200 2000 7.89 0.12 99999.000001 char2
varchar2 another text false 2025-01-01 23:59:59
23:59:59Z 2025-01-01T23:59:59 2025-01-01T23:59:59 P0Y0M0DT2H0M0S
3q2+7w== 11111111-2222-3333-4444-555555555556 {"x":10} {"y":
20} 10.0.0.1 10.0.0.0/16 08:00:2b:aa:bb:cc 8A== Dw==
[10, 20] ["x", "y"] {"coordinates":[3,4],"type":"Point","srid":0}
08:00:2b:aa:bb:cc:dd:ee <root><item>2</item></root> {"x":"10","y":"20"}
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_types.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_types.out
new file mode 100644
index 00000000000..4493da5abca
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_types.out
@@ -0,0 +1,21 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !desc_array_types --
+id bigint No true \N
+int2_array_col array<smallint> Yes false \N NONE
+int4_array_col array<int> Yes false \N NONE
+int8_array_col array<bigint> Yes false \N NONE
+float4_array_col array<float> Yes false \N NONE
+double_array_col array<double> Yes false \N NONE
+bool_array_col array<boolean> Yes false \N NONE
+varchar_array_col array<text> Yes false \N NONE
+text_array_col array<text> Yes false \N NONE
+timestamp_array_col array<datetime(6)> Yes false \N NONE
+timestamptz_array_col array<datetime(6)> Yes false \N NONE
+
+-- !select_array_types --
+1 [1, 2] [10, 20] [100, 200] [1.1, 2.2] [1.11, 2.22]
[1, 0] ["foo", "bar"] ["hello", "world"] ["2024-01-01 12:00:00.000000",
"2024-06-01 00:00:00.000000"] ["2024-01-01 04:00:00.000000", "2024-06-01
00:00:00.000000"]
+
+-- !select_array_types2 --
+1 [1, 2] [10, 20] [100, 200] [1.1, 2.2] [1.11, 2.22]
[1, 0] ["foo", "bar"] ["hello", "world"] ["2024-01-01 12:00:00.000000",
"2024-06-01 00:00:00.000000"] ["2024-01-01 04:00:00.000000", "2024-06-01
00:00:00.000000"]
+2 [3, 4] [30, 40] [300, 400] [3.3, 4.4] [3.33, 4.44]
[0, 1] ["baz", "qux"] ["foo", "bar"] ["2025-01-01 06:00:00.000000",
"2025-06-01 18:00:00.000000"] ["2024-12-31 22:00:00.000000", "2025-06-01
18:00:00.000000"]
+
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy
index 57666ddf5db..9ec4ab3a52e 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy
@@ -43,6 +43,7 @@ suite("test_streaming_postgres_job_all_type",
"p0,external,pg,external_docker,ex
// create test
connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
// sql """CREATE SCHEMA IF NOT EXISTS ${pgSchema}"""
+ sql """CREATE EXTENSION IF NOT EXISTS hstore"""
sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
sql """
create table ${pgDB}.${pgSchema}.${table1} (
@@ -74,12 +75,15 @@ suite("test_streaming_postgres_job_all_type",
"p0,external,pg,external_docker,ex
bit_varying_col bit varying(16),
int_array_col integer[],
text_array_col text[],
- point_col point
+ point_col point,
+ macaddr8_col macaddr8,
+ xml_col xml,
+ hstore_col hstore
);
"""
// mock snapshot data
sql """
- INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES
(1,1,100,1000,1.23,4.56,12345.678901,'char','varchar','text
value',true,'2024-01-01','12:00:00','12:00:00+08','2024-01-01
12:00:00','2024-01-01 12:00:00+08','1 day',decode('DEADBEEF',
'hex'),'11111111-2222-3333-4444-555555555555'::uuid,'{"a":1}','{"b":2}','192.168.1.1','192.168.0.0/24','08:00:2b:01:02:03',B'10101010',B'1010',ARRAY[1,2,3],ARRAY['a','b','c'],'(1,2)');
+ INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES
(1,1,100,1000,1.23,4.56,12345.678901,'char','varchar','text
value',true,'2024-01-01','12:00:00','12:00:00+08','2024-01-01
12:00:00','2024-01-01 12:00:00+08','1 day',decode('DEADBEEF',
'hex'),'11111111-2222-3333-4444-555555555555'::uuid,'{"a":1}','{"b":2}','192.168.1.1','192.168.0.0/24','08:00:2b:01:02:03',B'10101010',B'1010',ARRAY[1,2,3],ARRAY['a','b','c'],'(1,2)','08:00:2b:01:02:03:04:05'::macaddr8,'<root><item>1</item></root>
[...]
"""
}
@@ -125,7 +129,7 @@ suite("test_streaming_postgres_job_all_type",
"p0,external,pg,external_docker,ex
// mock incremental into
connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
- sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES
(2,2,200,2000,7.89,0.12,99999.000001,'char2','varchar2','another
text',false,'2025-01-01','23:59:59','23:59:59+00','2025-01-01
23:59:59','2025-01-01 23:59:59+00','2 hours',decode('DEADBEEF',
'hex'),'11111111-2222-3333-4444-555555555556'::uuid,'{"x":10}','{"y":20}','10.0.0.1','10.0.0.0/16','08:00:2b:aa:bb:cc',B'11110000',B'1111',ARRAY[10,20],ARRAY['x','y'],'(3,4)');"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES
(2,2,200,2000,7.89,0.12,99999.000001,'char2','varchar2','another
text',false,'2025-01-01','23:59:59','23:59:59+00','2025-01-01
23:59:59','2025-01-01 23:59:59+00','2 hours',decode('DEADBEEF',
'hex'),'11111111-2222-3333-4444-555555555556'::uuid,'{"x":10}','{"y":20}','10.0.0.1','10.0.0.0/16','08:00:2b:aa:bb:cc',B'11110000',B'1111',ARRAY[10,20],ARRAY['x','y'],'(3,4)','08:00:2b:aa:bb:cc:dd:ee'::macaddr8,'<root><item>2</item><
[...]
}
sleep(60000); // wait for cdc incremental data
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_types.groovy
similarity index 55%
copy from
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy
copy to
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_types.groovy
index 57666ddf5db..3ae65b118d3 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_array_types.groovy
@@ -20,10 +20,10 @@ import org.awaitility.Awaitility
import static java.util.concurrent.TimeUnit.SECONDS
-suite("test_streaming_postgres_job_all_type",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
- def jobName = "test_streaming_postgres_job_all_type_name"
+suite("test_streaming_postgres_job_array_types",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_postgres_job_array_types_name"
def currentDb = (sql "select database()")[0][0]
- def table1 = "streaming_all_types_nullable_with_pk_pg"
+ def table1 = "streaming_array_types_pg"
def pgDB = "postgres"
def pgSchema = "cdc_test"
def pgUser = "postgres"
@@ -40,46 +40,38 @@ suite("test_streaming_postgres_job_all_type",
"p0,external,pg,external_docker,ex
String bucket = getS3BucketName()
String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
- // create test
connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
- // sql """CREATE SCHEMA IF NOT EXISTS ${pgSchema}"""
sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
sql """
- create table ${pgDB}.${pgSchema}.${table1} (
- id bigserial PRIMARY KEY,
- smallint_col smallint,
- integer_col integer,
- bigint_col bigint,
- real_col real,
- double_col double precision,
- numeric_col numeric(20,6),
- char_col char(10),
- varchar_col varchar(255),
- text_col text,
- boolean_col boolean,
- date_col date,
- time_col time,
- timetz_col time with time zone,
- timestamp_col timestamp,
- timestamptz_col timestamp with time zone,
- interval_col interval,
- bytea_col bytea,
- uuid_col uuid,
- json_col json,
- jsonb_col jsonb,
- inet_col inet,
- cidr_col cidr,
- macaddr_col macaddr,
- bit_col bit(8),
- bit_varying_col bit varying(16),
- int_array_col integer[],
- text_array_col text[],
- point_col point
+ CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+ id bigserial PRIMARY KEY,
+ int2_array_col int2[],
+ int4_array_col int4[],
+ int8_array_col int8[],
+ float4_array_col float4[],
+ double_array_col double precision[],
+ bool_array_col bool[],
+ varchar_array_col varchar(50)[],
+ text_array_col text[],
+ timestamp_array_col timestamp[],
+ timestamptz_array_col timestamptz[]
);
"""
// mock snapshot data
sql """
- INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES
(1,1,100,1000,1.23,4.56,12345.678901,'char','varchar','text
value',true,'2024-01-01','12:00:00','12:00:00+08','2024-01-01
12:00:00','2024-01-01 12:00:00+08','1 day',decode('DEADBEEF',
'hex'),'11111111-2222-3333-4444-555555555555'::uuid,'{"a":1}','{"b":2}','192.168.1.1','192.168.0.0/24','08:00:2b:01:02:03',B'10101010',B'1010',ARRAY[1,2,3],ARRAY['a','b','c'],'(1,2)');
+ INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (
+ 1,
+ ARRAY[1::int2, 2::int2],
+ ARRAY[10::int4, 20::int4],
+ ARRAY[100::int8, 200::int8],
+ ARRAY[1.1::float4, 2.2::float4],
+ ARRAY[1.11::double precision, 2.22::double precision],
+ ARRAY[true, false],
+ ARRAY['foo'::varchar, 'bar'::varchar],
+ ARRAY['hello', 'world'],
+ ARRAY['2024-01-01 12:00:00'::timestamp, '2024-06-01
00:00:00'::timestamp],
+ ARRAY['2024-01-01 12:00:00+08'::timestamptz, '2024-06-01
00:00:00+00'::timestamptz]
+ );
"""
}
@@ -93,7 +85,7 @@ suite("test_streaming_postgres_job_all_type",
"p0,external,pg,external_docker,ex
"password" = "${pgPassword}",
"database" = "${pgDB}",
"schema" = "${pgSchema}",
- "include_tables" = "${table1}",
+ "include_tables" = "${table1}",
"offset" = "initial"
)
TO DATABASE ${currentDb} (
@@ -108,7 +100,6 @@ suite("test_streaming_postgres_job_all_type",
"p0,external,pg,external_docker,ex
{
def jobSuccendCount = sql """ select SucceedTaskCount
from jobs("type"="insert") where Name = '${jobName}' and
ExecuteType='STREAMING' """
log.info("jobSuccendCount: " + jobSuccendCount)
- // check job status and succeed task count larger than
1
jobSuccendCount.size() == 1 && '1' <=
jobSuccendCount.get(0).get(0)
}
)
@@ -120,24 +111,37 @@ suite("test_streaming_postgres_job_all_type",
"p0,external,pg,external_docker,ex
throw ex;
}
- qt_desc_all_types_null """desc ${currentDb}.${table1};"""
- qt_select_all_types_null """select * from ${currentDb}.${table1} order
by 1;"""
+ qt_desc_array_types """desc ${currentDb}.${table1};"""
+ qt_select_array_types """select * from ${currentDb}.${table1} order by
1;"""
- // mock incremental into
+ // mock incremental data
connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
- sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES
(2,2,200,2000,7.89,0.12,99999.000001,'char2','varchar2','another
text',false,'2025-01-01','23:59:59','23:59:59+00','2025-01-01
23:59:59','2025-01-01 23:59:59+00','2 hours',decode('DEADBEEF',
'hex'),'11111111-2222-3333-4444-555555555556'::uuid,'{"x":10}','{"y":20}','10.0.0.1','10.0.0.0/16','08:00:2b:aa:bb:cc',B'11110000',B'1111',ARRAY[10,20],ARRAY['x','y'],'(3,4)');"""
+ sql """
+ INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (
+ 2,
+ ARRAY[3::int2, 4::int2],
+ ARRAY[30::int4, 40::int4],
+ ARRAY[300::int8, 400::int8],
+ ARRAY[3.3::float4, 4.4::float4],
+ ARRAY[3.33::double precision, 4.44::double precision],
+ ARRAY[false, true],
+ ARRAY['baz'::varchar, 'qux'::varchar],
+ ARRAY['foo', 'bar'],
+ ARRAY['2025-01-01 06:00:00'::timestamp, '2025-06-01
18:00:00'::timestamp],
+ ARRAY['2025-01-01 06:00:00+08'::timestamptz, '2025-06-01
18:00:00+00'::timestamptz]
+ );
+ """
}
sleep(60000); // wait for cdc incremental data
- // check incremental data
- qt_select_all_types_null2 """select * from ${currentDb}.${table1}
order by 1;"""
+ qt_select_array_types2 """select * from ${currentDb}.${table1} order
by 1;"""
sql """
- DROP JOB IF EXISTS where jobname = '${jobName}'
+ DROP JOB IF EXISTS where jobname = '${jobName}'
"""
- def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
assert jobCountRsp.get(0).get(0) == 0
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]