This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cd02a57e68 [Improve](streaming job) support custom table name mapping 
for CDC streaming job  (#61317)
0cd02a57e68 is described below

commit 0cd02a57e68a3a5965acab400e8edbe18a5c6aed
Author: wudi <[email protected]>
AuthorDate: Mon Mar 23 10:28:57 2026 +0800

    [Improve](streaming job) support custom table name mapping for CDC 
streaming job  (#61317)
    
    ### What problem does this PR solve?
    
     #### Summary
    
    Add support for mapping upstream (PostgreSQL) table names to custom
    downstream (Doris) table names
    in CDC streaming jobs. Without this feature, the Doris target table must
    have the same name as the
      upstream source table.
    
     #### New configuration
    
    Key format: `"table.<srcTable>.target_table" = "<dstTable>"` in the
    `FROM` clause properties.
    
      ```sql
      CREATE JOB my_job
        ON STREAMING
        FROM POSTGRES (
          ...
          "include_tables" = "pg_orders",
          "table.pg_orders.target_table" = "doris_orders"
        )
        TO DATABASE mydb (...)
    ```
    
      When not configured, behavior is unchanged (target table name = source 
table name).
    
    #### Key design decisions
    
      - generateCreateTableCmds returns LinkedHashMap<srcName, 
CreateTableCommand> so callers can
      distinguish source names (for CDC monitoring) from target names (for DDL) 
— this fixes a bug
      where the CDC split assigner would look up the Doris target table name in 
PostgreSQL
      - Multi-table merge is supported: two source tables can map to the same 
Doris table
    
    #### Test plan
    
      - test_streaming_postgres_job_table_mapping: basic mapping 
(INSERT/UPDATE/DELETE land in mapped table; Doris table
      created with target name, not source name)
      - test_streaming_postgres_job_table_mapping: multi-table merge (two PG 
tables → one Doris table, snapshot +
      incremental)
---
 .../apache/doris/job/cdc/DataSourceConfigKeys.java |   1 +
 .../streaming/DataSourceConfigValidator.java       |   5 +
 .../insert/streaming/StreamingInsertJob.java       |  15 +-
 .../apache/doris/job/util/StreamingJobUtils.java   |  25 ++-
 .../cdcclient/service/PipelineCoordinator.java     |   8 +-
 .../deserialize/DebeziumJsonDeserializer.java      |  11 ++
 .../PostgresDebeziumJsonDeserializer.java          |   6 +-
 .../apache/doris/cdcclient/utils/ConfigUtil.java   |  25 +++
 .../test_streaming_postgres_job_table_mapping.out  |  19 ++
 ...est_streaming_postgres_job_table_mapping.groovy | 194 +++++++++++++++++++++
 10 files changed, 298 insertions(+), 11 deletions(-)

diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
index 3858d9ebaff..b2bda583beb 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
@@ -41,6 +41,7 @@ public class DataSourceConfigKeys {
     // per-table config: key format is "table.<tableName>.<suffix>"
     public static final String TABLE = "table";
     public static final String TABLE_EXCLUDE_COLUMNS_SUFFIX = 
"exclude_columns";
+    public static final String TABLE_TARGET_TABLE_SUFFIX = "target_table";
 
     // target properties
     public static final String TABLE_PROPS_PREFIX = "table.create.properties.";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
index 3bea2c21242..63efaf296cb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
@@ -45,6 +45,7 @@ public class DataSourceConfigValidator {
 
     // Known suffixes for per-table config keys (format: 
"table.<tableName>.<suffix>")
     private static final Set<String> ALLOW_TABLE_LEVEL_SUFFIXES = 
Sets.newHashSet(
+            DataSourceConfigKeys.TABLE_TARGET_TABLE_SUFFIX,
             DataSourceConfigKeys.TABLE_EXCLUDE_COLUMNS_SUFFIX
     );
 
@@ -67,6 +68,10 @@ public class DataSourceConfigValidator {
                 if (!ALLOW_TABLE_LEVEL_SUFFIXES.contains(suffix)) {
                     throw new IllegalArgumentException("Unknown per-table 
config key: '" + key + "'");
                 }
+                if (value == null || value.trim().isEmpty()) {
+                    throw new IllegalArgumentException(
+                            "Value for per-table config key '" + key + "' must 
not be empty");
+                }
                 continue;
             }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index ea5298c026a..be5c70d864a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -92,6 +92,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -255,15 +256,21 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
 
     private List<String> createTableIfNotExists() throws Exception {
         List<String> syncTbls = new ArrayList<>();
-        List<CreateTableCommand> createTblCmds = 
StreamingJobUtils.generateCreateTableCmds(targetDb,
-                dataSourceType, sourceProperties, targetProperties);
+        // Key: source table name (PG/MySQL); Value: CreateTableCommand for 
the Doris target table.
+        // The two names differ when "table.<src>.target_table" is configured.
+        LinkedHashMap<String, CreateTableCommand> createTblCmds =
+                StreamingJobUtils.generateCreateTableCmds(targetDb,
+                        dataSourceType, sourceProperties, targetProperties);
         Database db = 
Env.getCurrentEnv().getInternalCatalog().getDbNullable(targetDb);
         Preconditions.checkNotNull(db, "target database %s does not exist", 
targetDb);
-        for (CreateTableCommand createTblCmd : createTblCmds) {
+        for (Map.Entry<String, CreateTableCommand> entry : 
createTblCmds.entrySet()) {
+            String srcTable = entry.getKey();
+            CreateTableCommand createTblCmd = entry.getValue();
             if 
(!db.isTableExist(createTblCmd.getCreateTableInfo().getTableName())) {
                 createTblCmd.run(ConnectContext.get(), null);
             }
-            syncTbls.add(createTblCmd.getCreateTableInfo().getTableName());
+            // Use the source (upstream) table name so CDC monitors the 
correct PG/MySQL table
+            syncTbls.add(srcTable);
         }
         return syncTbls;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
index d2222dad383..9eec0061219 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
@@ -277,10 +277,20 @@ public class StreamingJobUtils {
         return newProps;
     }
 
-    public static List<CreateTableCommand> generateCreateTableCmds(String 
targetDb, DataSourceType sourceType,
+    /**
+     * Generate CREATE TABLE commands for the Doris target tables.
+     *
+     * <p>Returns a {@link LinkedHashMap} whose key is the <b>source</b> 
(upstream) table name and
+     * whose value is the corresponding {@link CreateTableCommand} that 
creates the Doris target
+     * table (which may have a different name when {@code 
table.<src>.target_table} is configured).
+     * Callers must use the map key as the PG/MySQL source table identifier 
for CDC monitoring and
+     * the {@link CreateTableCommand} value for the actual DDL execution.
+     */
+    public static LinkedHashMap<String, CreateTableCommand> 
generateCreateTableCmds(String targetDb,
+            DataSourceType sourceType,
             Map<String, String> properties, Map<String, String> 
targetProperties)
             throws JobException {
-        List<CreateTableCommand> createtblCmds = new ArrayList<>();
+        LinkedHashMap<String, CreateTableCommand> createtblCmds = new 
LinkedHashMap<>();
         String includeTables = 
properties.get(DataSourceConfigKeys.INCLUDE_TABLES);
         String excludeTables = 
properties.get(DataSourceConfigKeys.EXCLUDE_TABLES);
         List<String> includeTablesList = new ArrayList<>();
@@ -322,6 +332,12 @@ public class StreamingJobUtils {
                 noPrimaryKeyTables.add(table);
             }
 
+            // Resolve target (Doris) table name; defaults to source table 
name if not configured
+            String targetTableName = properties.getOrDefault(
+                    DataSourceConfigKeys.TABLE + "." + table + "."
+                            + DataSourceConfigKeys.TABLE_TARGET_TABLE_SUFFIX,
+                    table).trim();
+
             // Validate and apply exclude_columns for this table
             Set<String> excludeColumns = parseExcludeColumns(properties, 
table);
             if (!excludeColumns.isEmpty()) {
@@ -352,7 +368,7 @@ public class StreamingJobUtils {
                     false, // isTemp
                     InternalCatalog.INTERNAL_CATALOG_NAME, // ctlName
                     targetDb, // dbName
-                    table, // tableName
+                    targetTableName, // tableName
                     columnDefinitions, // columns
                     ImmutableList.of(), // indexes
                     "olap", // engineName
@@ -367,7 +383,8 @@ public class StreamingJobUtils {
                     ImmutableList.of() // clusterKeyColumnNames
             );
             CreateTableCommand createtblCmd = new 
CreateTableCommand(Optional.empty(), createtblInfo);
-            createtblCmds.add(createtblCmd);
+            // Key: source (PG/MySQL) table name; Value: command that creates 
the Doris target table
+            createtblCmds.put(table, createtblCmd);
         }
         if (createtblCmds.isEmpty()) {
             throw new JobException("Can not found match table in database " + 
database);
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 97aa4b7f5f2..414a1d23797 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -24,6 +24,7 @@ import org.apache.doris.cdcclient.sink.DorisBatchStreamLoad;
 import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
 import org.apache.doris.cdcclient.source.reader.SourceReader;
 import org.apache.doris.cdcclient.source.reader.SplitReadResult;
+import org.apache.doris.cdcclient.utils.ConfigUtil;
 import org.apache.doris.cdcclient.utils.SchemaChangeManager;
 import org.apache.doris.job.cdc.request.FetchRecordRequest;
 import org.apache.doris.job.cdc.request.WriteRecordRequest;
@@ -249,6 +250,10 @@ public class PipelineCoordinator {
         Map<String, String> deserializeContext = new 
HashMap<>(writeRecordRequest.getConfig());
         deserializeContext.put(Constants.DORIS_TARGET_DB, targetDb);
 
+        // Pre-parse source->target table name mappings once for this request
+        Map<String, String> targetTableMappings =
+                
ConfigUtil.parseAllTargetTableMappings(writeRecordRequest.getConfig());
+
         SourceReader sourceReader = 
Env.getCurrentEnv().getReader(writeRecordRequest);
         DorisBatchStreamLoad batchStreamLoad = null;
         long scannedRows = 0L;
@@ -338,9 +343,10 @@ public class PipelineCoordinator {
                     }
                     if (!CollectionUtils.isEmpty(result.getRecords())) {
                         String table = extractTable(element);
+                        String dorisTable = 
targetTableMappings.getOrDefault(table, table);
                         for (String record : result.getRecords()) {
                             scannedRows++;
-                            batchStreamLoad.writeRecord(targetDb, table, 
record.getBytes());
+                            batchStreamLoad.writeRecord(targetDb, dorisTable, 
record.getBytes());
                         }
                         // Mark last message as data (not heartbeat)
                         lastMessageIsHeartbeat = false;
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
index 25b2b544893..7876597660e 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
@@ -82,6 +82,8 @@ public class DebeziumJsonDeserializer
     private static ObjectMapper objectMapper = new ObjectMapper();
     @Setter private ZoneId serverTimeZone = ZoneId.systemDefault();
     @Getter @Setter protected Map<TableId, TableChanges.TableChange> 
tableSchemas;
+    // Parsed source->target table name mappings, populated once in init() 
from config
+    protected Map<String, String> targetTableMappingsCache = new HashMap<>();
     // Parsed exclude-column sets per table, populated once in init() from 
config
     protected Map<String, Set<String>> excludeColumnsCache = new HashMap<>();
 
@@ -91,9 +93,18 @@ public class DebeziumJsonDeserializer
     public void init(Map<String, String> props) {
         this.serverTimeZone =
                 
ConfigUtil.getServerTimeZoneFromJdbcUrl(props.get(DataSourceConfigKeys.JDBC_URL));
+        targetTableMappingsCache = 
ConfigUtil.parseAllTargetTableMappings(props);
         excludeColumnsCache = ConfigUtil.parseAllExcludeColumns(props);
     }
 
+    /**
+     * Resolve the Doris target table name for a given upstream (PG) source 
table name. Returns the
+     * mapped name if configured, otherwise returns the source name unchanged.
+     */
+    protected String resolveTargetTable(String srcTable) {
+        return targetTableMappingsCache.getOrDefault(srcTable, srcTable);
+    }
+
     @Override
     public DeserializeResult deserialize(Map<String, String> context, 
SourceRecord record)
             throws IOException {
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
index aa1a6e9c7bd..85fdb1ddea7 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
@@ -218,7 +218,9 @@ public class PostgresDebeziumJsonDeserializer extends 
DebeziumJsonDeserializer {
                         colName);
                 continue;
             }
-            ddls.add(SchemaChangeHelper.buildDropColumnSql(db, 
tableId.table(), colName));
+            ddls.add(
+                    SchemaChangeHelper.buildDropColumnSql(
+                            db, resolveTargetTable(tableId.table()), colName));
         }
 
         for (Column col : pgAdded) {
@@ -243,7 +245,7 @@ public class PostgresDebeziumJsonDeserializer extends 
DebeziumJsonDeserializer {
             ddls.add(
                     SchemaChangeHelper.buildAddColumnSql(
                             db,
-                            tableId.table(),
+                            resolveTargetTable(tableId.table()),
                             col.name(),
                             colType + nullable,
                             defaultObj != null ? String.valueOf(defaultObj) : 
null,
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
index 56d9aeac53e..46d581f58e5 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
@@ -169,6 +169,31 @@ public class ConfigUtil {
         return result;
     }
 
+    /**
+     * Parse all target-table name mappings from config.
+     *
+     * <p>Scans all keys matching {@code "table.<srcTableName>.target_table"} 
and returns a map from
+     * source table name to target (Doris) table name. Tables without a 
mapping are NOT included;
+     * callers should use {@code getOrDefault(srcTable, srcTable)}.
+     */
+    public static Map<String, String> parseAllTargetTableMappings(Map<String, 
String> config) {
+        String prefix = DataSourceConfigKeys.TABLE + ".";
+        String suffix = "." + DataSourceConfigKeys.TABLE_TARGET_TABLE_SUFFIX;
+        Map<String, String> result = new HashMap<>();
+        for (Map.Entry<String, String> entry : config.entrySet()) {
+            String key = entry.getKey();
+            if (key.startsWith(prefix) && key.endsWith(suffix)) {
+                String srcTable = key.substring(prefix.length(), key.length() 
- suffix.length());
+                String rawValue = entry.getValue();
+                String dstTable = rawValue != null ? rawValue.trim() : "";
+                if (!srcTable.isEmpty() && !dstTable.isEmpty()) {
+                    result.put(srcTable, dstTable);
+                }
+            }
+        }
+        return result;
+    }
+
     public static Map<String, String> toStringMap(String json) {
         if (!isJson(json)) {
             return null;
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.out
new file mode 100644
index 00000000000..8d922a718f1
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.out
@@ -0,0 +1,19 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_snapshot --
+1      Alice
+2      Bob
+
+-- !select_incremental --
+2      Bob_v2
+3      Carol
+
+-- !select_merge_snapshot --
+100    Src1_A
+200    Src2_A
+
+-- !select_merge_incremental --
+100    Src1_A
+101    Src1_B
+200    Src2_A
+201    Src2_B
+
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.groovy
new file mode 100644
index 00000000000..b31805e682a
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_table_mapping.groovy
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_table_mapping", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName         = "test_streaming_pg_table_mapping"
+    def jobNameMerge    = "test_streaming_pg_table_mapping_merge"
+    def currentDb       = (sql "select database()")[0][0]
+    def pgSrcTable      = "pg_src_table"        // upstream PG table name
+    def dorisDstTable   = "doris_dst_table"     // downstream Doris table name 
(mapped)
+    def pgSrcTable2     = "pg_src_table2"       // second upstream table 
(multi-table merge)
+    def dorisMergeTable = "doris_merge_table"   // both PG tables merge into 
this Doris table
+    def pgDB            = "postgres"
+    def pgSchema        = "cdc_test"
+    def pgUser          = "postgres"
+    def pgPassword      = "123456"
+
+    // Cleanup
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """DROP JOB IF EXISTS where jobname = '${jobNameMerge}'"""
+    sql """drop table if exists ${currentDb}.${dorisDstTable} force"""
+    sql """drop table if exists ${currentDb}.${dorisMergeTable} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port       = context.config.otherConfigs.get("pg_14_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint   = getS3Endpoint()
+        String bucket        = getS3BucketName()
+        String driver_url    = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // ── Case 1: basic table name mapping 
─────────────────────────────────
+        // PG table: pg_src_table → Doris table: doris_dst_table
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgSrcTable}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgSrcTable} (
+                      "id"   int,
+                      "name" varchar(200),
+                      PRIMARY KEY ("id")
+                    )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable} VALUES (1, 
'Alice')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable} VALUES (2, 
'Bob')"""
+        }
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url"       = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url"     = "${driver_url}",
+                    "driver_class"   = "org.postgresql.Driver",
+                    "user"           = "${pgUser}",
+                    "password"       = "${pgPassword}",
+                    "database"       = "${pgDB}",
+                    "schema"         = "${pgSchema}",
+                    "include_tables" = "${pgSrcTable}",
+                    "offset"         = "initial",
+                    "table.${pgSrcTable}.target_table" = "${dorisDstTable}"
+                )
+                TO DATABASE ${currentDb} (
+                    "table.create.properties.replication_num" = "1"
+                )"""
+
+        // Verify the Doris table was created with the mapped name, not the 
source name
+        def tables = (sql """show tables from ${currentDb}""").collect { it[0] 
}
+        assert tables.contains(dorisDstTable) : "Doris target table 
'${dorisDstTable}' should exist"
+        assert !tables.contains(pgSrcTable)   : "Source table name 
'${pgSrcTable}' must NOT exist in Doris"
+
+        // Wait for snapshot
+        try {
+            Awaitility.await().atMost(300, SECONDS).pollInterval(1, 
SECONDS).until({
+                def cnt = sql """select SucceedTaskCount from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING'"""
+                cnt.size() == 1 && cnt.get(0).get(0).toLong() >= 2
+            })
+        } catch (Exception ex) {
+            log.info("show job: " + (sql """select * from 
jobs("type"="insert") where Name='${jobName}'"""))
+            log.info("show task: " + (sql """select * from 
tasks("type"="insert") where JobName='${jobName}'"""))
+            throw ex
+        }
+
+        qt_select_snapshot """ SELECT * FROM ${dorisDstTable} ORDER BY id ASC 
"""
+
+        // Incremental: INSERT / UPDATE / DELETE must all land in 
doris_dst_table
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable} VALUES (3, 
'Carol')"""
+            sql """UPDATE ${pgDB}.${pgSchema}.${pgSrcTable} SET name = 
'Bob_v2' WHERE id = 2"""
+            sql """DELETE FROM ${pgDB}.${pgSchema}.${pgSrcTable} WHERE id = 
1"""
+        }
+        try {
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def ids = (sql """ SELECT id FROM ${dorisDstTable} ORDER BY id 
ASC """).collect { it[0].toInteger() }
+                ids.contains(3) && !ids.contains(1)
+            })
+        } catch (Exception ex) {
+            log.info("show job: " + (sql """select * from 
jobs("type"="insert") where Name='${jobName}'"""))
+            log.info("show task: " + (sql """select * from 
tasks("type"="insert") where JobName='${jobName}'"""))
+            throw ex
+        }
+
+        qt_select_incremental """ SELECT * FROM ${dorisDstTable} ORDER BY id 
ASC """
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        // ── Case 2: multi-table merge (two PG tables → one Doris table) 
──────
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgSrcTable}"""
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgSrcTable2}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgSrcTable} (
+                      "id"   int,
+                      "name" varchar(200),
+                      PRIMARY KEY ("id")
+                    )"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgSrcTable2} (
+                      "id"   int,
+                      "name" varchar(200),
+                      PRIMARY KEY ("id")
+                    )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable}  VALUES (100, 
'Src1_A')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable2} VALUES (200, 
'Src2_A')"""
+        }
+
+        sql """CREATE JOB ${jobNameMerge}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url"       = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url"     = "${driver_url}",
+                    "driver_class"   = "org.postgresql.Driver",
+                    "user"           = "${pgUser}",
+                    "password"       = "${pgPassword}",
+                    "database"       = "${pgDB}",
+                    "schema"         = "${pgSchema}",
+                    "include_tables" = "${pgSrcTable},${pgSrcTable2}",
+                    "offset"         = "initial",
+                    "table.${pgSrcTable}.target_table"  = "${dorisMergeTable}",
+                    "table.${pgSrcTable2}.target_table" = "${dorisMergeTable}"
+                )
+                TO DATABASE ${currentDb} (
+                    "table.create.properties.replication_num" = "1"
+                )"""
+
+        // Wait for snapshot rows from both source tables
+        try {
+            Awaitility.await().atMost(300, SECONDS).pollInterval(2, 
SECONDS).until({
+                def ids = (sql """ SELECT id FROM ${dorisMergeTable} 
""").collect { it[0].toInteger() }
+                ids.contains(100) && ids.contains(200)
+            })
+        } catch (Exception ex) {
+            log.info("show job: " + (sql """select * from 
jobs("type"="insert") where Name='${jobNameMerge}'"""))
+            log.info("show task: " + (sql """select * from 
tasks("type"="insert") where JobName='${jobNameMerge}'"""))
+            throw ex
+        }
+
+        qt_select_merge_snapshot """ SELECT * FROM ${dorisMergeTable} ORDER BY 
id ASC """
+
+        // Incremental from both source tables
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable}  VALUES (101, 
'Src1_B')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgSrcTable2} VALUES (201, 
'Src2_B')"""
+        }
+        try {
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def ids = (sql """ SELECT id FROM ${dorisMergeTable} 
""").collect { it[0].toInteger() }
+                ids.contains(101) && ids.contains(201)
+            })
+        } catch (Exception ex) {
+            log.info("show job: " + (sql """select * from 
jobs("type"="insert") where Name='${jobNameMerge}'"""))
+            log.info("show task: " + (sql """select * from 
tasks("type"="insert") where JobName='${jobNameMerge}'"""))
+            throw ex
+        }
+
+        qt_select_merge_incremental """ SELECT * FROM ${dorisMergeTable} ORDER 
BY id ASC """
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobNameMerge}'"""
+        def mergeJobCnt = sql """select count(1) from jobs("type"="insert") 
where Name = '${jobNameMerge}'"""
+        assert mergeJobCnt.get(0).get(0) == 0
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to