This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 26e98e92fb4 [regression](streamingjob) add log for pg case (#60624)
26e98e92fb4 is described below

commit 26e98e92fb46d3c7094612284542233fc6b80dff
Author: wudi <[email protected]>
AuthorDate: Tue Feb 10 19:53:37 2026 +0800

    [regression](streamingjob) add log for pg case (#60624)
    
    ### What problem does this PR solve?
    
    add log for pg sync case
---
 .../job/extensions/insert/streaming/StreamingMultiTblTask.java    | 2 +-
 .../job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy   | 8 ++++++--
 .../cdc/test_streaming_postgres_job_partition.groovy              | 8 ++++++--
 3 files changed, 13 insertions(+), 5 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 3061cb80c58..b6a4e8c939b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -113,7 +113,7 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
     @Override
     public void run() throws JobException {
         if (getIsCanceled().get()) {
-            log.info("task has been canceled, task id is {}", getTaskId());
+            log.info("streaming task has been canceled, task id is {}", 
getTaskId());
             return;
         }
         sendWriteRequest();
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
index ba2c1247016..15ff0324fa4 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
@@ -135,11 +135,13 @@ suite("test_streaming_postgres_job", 
"p0,external,pg,external_docker,external_do
         // mock incremental into
         connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
             sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name,age) VALUES 
('Doris',18);"""
+            def xminResult = sql """SELECT xmin, xmax , * FROM 
${pgSchema}.${table1} WHERE name = 'Doris'; """
+            log.info("xminResult: " + xminResult)
             sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET age = 10 WHERE 
name = 'B1';"""
             sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE name = 
'A1';"""
         }
 
-        sleep(30000); // wait for cdc incremental data
+        sleep(60000); // wait for cdc incremental data
 
         // check incremental data
         qt_select_binlog_table1 """ SELECT * FROM ${table1} order by name asc 
"""
@@ -156,9 +158,11 @@ suite("test_streaming_postgres_job", 
"p0,external,pg,external_docker,external_do
         // mock incremental into again
         connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
             sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name,age) VALUES 
('Apache',40);"""
+           def xminResult1 = sql """SELECT xmin, xmax , * FROM 
${pgSchema}.${table1} WHERE name = 'Apache'; """
+            log.info("xminResult1: " + xminResult1)
         }
 
-        sleep(30000); // wait for cdc incremental data
+        sleep(60000); // wait for cdc incremental data
 
         // check incremental data
         qt_select_next_binlog_table1 """ SELECT * FROM ${table1} order by name 
asc """
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.groovy
index 2ae0ac1f6db..4a37cb22fcf 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.groovy
@@ -111,11 +111,12 @@ suite("test_streaming_postgres_job_partition", 
"p0,external,pg,external_docker,e
 
         // 4. mock insert, update, delete and create new partition
         connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
-            sql """SET search_path TO ${pgSchema}"""
-
             // insert
             sql """INSERT INTO ${pgSchema}.${table1} (id, user_id, order_date)
                    VALUES (3, 1003, DATE '2024-01-20');"""
+               
+            def xminResult = sql """SELECT xmin, xmax , * FROM 
${pgSchema}.${table1} WHERE id = 3"""
+            log.info("xminResult: " + xminResult)
 
             // update
             sql """UPDATE ${pgSchema}.${table1}
@@ -132,6 +133,9 @@ suite("test_streaming_postgres_job_partition", 
"p0,external,pg,external_docker,e
 
             sql """INSERT INTO ${pgSchema}.${table1} (id, user_id, order_date)
                    VALUES (4, 1004, DATE '2024-03-15');"""
+
+            def xminResult1 = sql """SELECT xmin, xmax , * FROM 
${pgSchema}.${table1} WHERE id = 4"""
+            log.info("xminResult1: " + xminResult1)
         }
 
         // wait for all incremental data 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to