This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 26e98e92fb4 [regression](streamingjob) add log for pg case (#60624)
26e98e92fb4 is described below
commit 26e98e92fb46d3c7094612284542233fc6b80dff
Author: wudi <[email protected]>
AuthorDate: Tue Feb 10 19:53:37 2026 +0800
[regression](streamingjob) add log for pg case (#60624)
### What problem does this PR solve?
add log for pg sync case
---
.../job/extensions/insert/streaming/StreamingMultiTblTask.java | 2 +-
.../job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy | 8 ++++++--
.../cdc/test_streaming_postgres_job_partition.groovy | 8 ++++++--
3 files changed, 13 insertions(+), 5 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 3061cb80c58..b6a4e8c939b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -113,7 +113,7 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
@Override
public void run() throws JobException {
if (getIsCanceled().get()) {
- log.info("task has been canceled, task id is {}", getTaskId());
+ log.info("streaming task has been canceled, task id is {}",
getTaskId());
return;
}
sendWriteRequest();
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
index ba2c1247016..15ff0324fa4 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
@@ -135,11 +135,13 @@ suite("test_streaming_postgres_job",
"p0,external,pg,external_docker,external_do
// mock incremental into
connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name,age) VALUES
('Doris',18);"""
+ def xminResult = sql """SELECT xmin, xmax , * FROM
${pgSchema}.${table1} WHERE name = 'Doris'; """
+ log.info("xminResult: " + xminResult)
sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET age = 10 WHERE
name = 'B1';"""
sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE name =
'A1';"""
}
- sleep(30000); // wait for cdc incremental data
+ sleep(60000); // wait for cdc incremental data
// check incremental data
qt_select_binlog_table1 """ SELECT * FROM ${table1} order by name asc
"""
@@ -156,9 +158,11 @@ suite("test_streaming_postgres_job",
"p0,external,pg,external_docker,external_do
// mock incremental into again
connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name,age) VALUES
('Apache',40);"""
+ def xminResult1 = sql """SELECT xmin, xmax , * FROM
${pgSchema}.${table1} WHERE name = 'Apache'; """
+ log.info("xminResult1: " + xminResult1)
}
- sleep(30000); // wait for cdc incremental data
+ sleep(60000); // wait for cdc incremental data
// check incremental data
qt_select_next_binlog_table1 """ SELECT * FROM ${table1} order by name
asc """
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.groovy
index 2ae0ac1f6db..4a37cb22fcf 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.groovy
@@ -111,11 +111,12 @@ suite("test_streaming_postgres_job_partition",
"p0,external,pg,external_docker,e
// 4. mock insert, update, delete and create new partition
connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
- sql """SET search_path TO ${pgSchema}"""
-
// insert
sql """INSERT INTO ${pgSchema}.${table1} (id, user_id, order_date)
VALUES (3, 1003, DATE '2024-01-20');"""
+
+ def xminResult = sql """SELECT xmin, xmax , * FROM
${pgSchema}.${table1} WHERE id = 3"""
+ log.info("xminResult: " + xminResult)
// update
sql """UPDATE ${pgSchema}.${table1}
@@ -132,6 +133,9 @@ suite("test_streaming_postgres_job_partition",
"p0,external,pg,external_docker,e
sql """INSERT INTO ${pgSchema}.${table1} (id, user_id, order_date)
VALUES (4, 1004, DATE '2024-03-15');"""
+
+ def xminResult1 = sql """SELECT xmin, xmax , * FROM
${pgSchema}.${table1} WHERE id = 4"""
+ log.info("xminResult1: " + xminResult1)
}
// wait for all incremental data
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]