This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 23b290dbfdc branch-4.1: [Improve](Streamingjob) support 
exclude_columns for Postgres streaming job      #61267 (#61537)
23b290dbfdc is described below

commit 23b290dbfdc8c47928d5523510e34fdb971f65ad
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Mar 20 13:06:52 2026 +0800

    branch-4.1: [Improve](Streamingjob) support exclude_columns for Postgres 
streaming job      #61267 (#61537)
    
    Cherry-picked from #61267
    
    Co-authored-by: wudi <[email protected]>
---
 .../apache/doris/job/cdc/DataSourceConfigKeys.java |   4 +
 .../streaming/DataSourceConfigValidator.java       |  22 ++
 .../apache/doris/job/util/StreamingJobUtils.java   |  43 ++++
 .../deserialize/DebeziumJsonDeserializer.java      |  42 +++-
 .../PostgresDebeziumJsonDeserializer.java          |  24 +++
 .../apache/doris/cdcclient/utils/ConfigUtil.java   |  53 +++++
 .../cdc/test_streaming_postgres_job_col_filter.out |  20 ++
 .../cdc/test_streaming_mysql_job_dup.groovy        |   2 +-
 .../test_streaming_postgres_job_col_filter.groovy  | 227 +++++++++++++++++++++
 9 files changed, 426 insertions(+), 11 deletions(-)

diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
index 77ce8b2bf51..3858d9ebaff 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
@@ -38,6 +38,10 @@ public class DataSourceConfigKeys {
     public static final String SSL_MODE = "ssl_mode";
     public static final String SSL_ROOTCERT = "ssl_rootcert";
 
+    // per-table config: key format is "table.<tableName>.<suffix>"
+    public static final String TABLE = "table";
+    public static final String TABLE_EXCLUDE_COLUMNS_SUFFIX = 
"exclude_columns";
+
     // target properties
     public static final String TABLE_PROPS_PREFIX = "table.create.properties.";
     public static final String LOAD_PROPERTIES = "load.";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
index f3d9e016950..3bea2c21242 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
@@ -43,11 +43,33 @@ public class DataSourceConfigValidator {
             DataSourceConfigKeys.SSL_ROOTCERT
     );
 
+    // Known suffixes for per-table config keys (format: 
"table.<tableName>.<suffix>")
+    private static final Set<String> ALLOW_TABLE_LEVEL_SUFFIXES = 
Sets.newHashSet(
+            DataSourceConfigKeys.TABLE_EXCLUDE_COLUMNS_SUFFIX
+    );
+
+    private static final String TABLE_LEVEL_PREFIX = 
DataSourceConfigKeys.TABLE + ".";
+
     public static void validateSource(Map<String, String> input) throws 
IllegalArgumentException {
         for (Map.Entry<String, String> entry : input.entrySet()) {
             String key = entry.getKey();
             String value = entry.getValue();
 
+            if (key.startsWith(TABLE_LEVEL_PREFIX)) {
+                // per-table config key must be exactly: 
table.<tableName>.<suffix>
+                // reject malformed keys like "table.exclude_columns" (missing 
tableName)
+                String[] parts = key.split("\\.", -1);
+                if (parts.length != 3 || parts[1].isEmpty()) {
+                    throw new IllegalArgumentException("Malformed per-table 
config key: '" + key
+                            + "'. Expected format: 
table.<tableName>.<suffix>");
+                }
+                String suffix = parts[parts.length - 1];
+                if (!ALLOW_TABLE_LEVEL_SUFFIXES.contains(suffix)) {
+                    throw new IllegalArgumentException("Unknown per-table 
config key: '" + key + "'");
+                }
+                continue;
+            }
+
             if (!ALLOW_SOURCE_KEYS.contains(key)) {
                 throw new IllegalArgumentException("Unexpected key: '" + key + 
"'");
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
index 7299f0a9b50..d2222dad383 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
@@ -63,12 +63,14 @@ import org.apache.commons.text.StringSubstitutor;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
@@ -319,6 +321,16 @@ public class StreamingJobUtils {
             if (primaryKeys.isEmpty()) {
                 noPrimaryKeyTables.add(table);
             }
+
+            // Validate and apply exclude_columns for this table
+            Set<String> excludeColumns = parseExcludeColumns(properties, 
table);
+            if (!excludeColumns.isEmpty()) {
+                validateExcludeColumns(excludeColumns, table, columns, 
primaryKeys);
+                columns = columns.stream()
+                        .filter(col -> !excludeColumns.contains(col.getName()))
+                        .collect(Collectors.toList());
+            }
+
             // Convert Column to ColumnDefinition
             List<ColumnDefinition> columnDefinitions = 
columns.stream().map(col -> {
                 DataType dataType = DataType.fromCatalogType(col.getType());
@@ -437,6 +449,37 @@ public class StreamingJobUtils {
         return remoteDb;
     }
 
+    private static Set<String> parseExcludeColumns(Map<String, String> 
properties, String tableName) {
+        String key = DataSourceConfigKeys.TABLE + "." + tableName + "."
+                + DataSourceConfigKeys.TABLE_EXCLUDE_COLUMNS_SUFFIX;
+        String value = properties.get(key);
+        if (StringUtils.isEmpty(value)) {
+            return Collections.emptySet();
+        }
+        return Arrays.stream(value.split(","))
+                .map(String::trim)
+                .filter(s -> !s.isEmpty())
+                .collect(Collectors.toSet());
+    }
+
+    private static void validateExcludeColumns(Set<String> excludeColumns, 
String tableName,
+            List<Column> columns, List<String> primaryKeys) throws 
JobException {
+        Set<String> colNames = 
columns.stream().map(Column::getName).collect(Collectors.toSet());
+        for (String col : excludeColumns) {
+            if (!colNames.contains(col)) {
+                throw new JobException(String.format(
+                        "exclude_columns validation failed: column '%s' does 
not exist in table '%s'",
+                        col, tableName));
+            }
+            if (primaryKeys.contains(col)) {
+                throw new JobException(String.format(
+                        "exclude_columns validation failed: column '%s' in 
table '%s'"
+                                + " is a primary key column and cannot be 
excluded",
+                        col, tableName));
+            }
+        }
+    }
+
     private static Map<String, String> getTableCreateProperties(Map<String, 
String> properties) {
         final Map<String, String> tableCreateProps = new HashMap<>();
         for (Map.Entry<String, String> entry : properties.entrySet()) {
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
index 065a3da2c09..1ec6da91fa6 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
@@ -38,12 +38,15 @@ import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
+import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
 import static org.apache.doris.cdcclient.common.Constants.DORIS_DELETE_SIGN;
 
 import com.esri.core.geometry.ogc.OGCGeometry;
@@ -79,6 +82,8 @@ public class DebeziumJsonDeserializer
     private static ObjectMapper objectMapper = new ObjectMapper();
     @Setter private ZoneId serverTimeZone = ZoneId.systemDefault();
     @Getter @Setter protected Map<TableId, TableChanges.TableChange> 
tableSchemas;
+    // Parsed exclude-column sets per table, populated once in init() from 
config
+    protected Map<String, Set<String>> excludeColumnsCache = new HashMap<>();
 
     public DebeziumJsonDeserializer() {}
 
@@ -86,6 +91,7 @@ public class DebeziumJsonDeserializer
     public void init(Map<String, String> props) {
         this.serverTimeZone =
                 
ConfigUtil.getServerTimeZoneFromJdbcUrl(props.get(DataSourceConfigKeys.JDBC_URL));
+        excludeColumnsCache = ConfigUtil.parseAllExcludeColumns(props);
     }
 
     @Override
@@ -102,18 +108,21 @@ public class DebeziumJsonDeserializer
 
     private List<String> deserializeDataChangeRecord(SourceRecord record) 
throws IOException {
         List<String> rows = new ArrayList<>();
+        String tableName = extractTableName(record);
+        Set<String> excludeColumns =
+                excludeColumnsCache.getOrDefault(tableName, 
Collections.emptySet());
         Envelope.Operation op = Envelope.operationFor(record);
         Struct value = (Struct) record.value();
         Schema valueSchema = record.valueSchema();
         if (Envelope.Operation.DELETE.equals(op)) {
-            String deleteRow = extractBeforeRow(value, valueSchema);
+            String deleteRow = extractBeforeRow(value, valueSchema, 
excludeColumns);
             if (StringUtils.isNotEmpty(deleteRow)) {
                 rows.add(deleteRow);
             }
         } else if (Envelope.Operation.READ.equals(op)
                 || Envelope.Operation.CREATE.equals(op)
                 || Envelope.Operation.UPDATE.equals(op)) {
-            String insertRow = extractAfterRow(value, valueSchema);
+            String insertRow = extractAfterRow(value, valueSchema, 
excludeColumns);
             if (StringUtils.isNotEmpty(insertRow)) {
                 rows.add(insertRow);
             }
@@ -121,7 +130,12 @@ public class DebeziumJsonDeserializer
         return rows;
     }
 
-    private String extractAfterRow(Struct value, Schema valueSchema)
+    private String extractTableName(SourceRecord record) {
+        Struct value = (Struct) record.value();
+        return 
value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY);
+    }
+
+    private String extractAfterRow(Struct value, Schema valueSchema, 
Set<String> excludeColumns)
             throws JsonProcessingException {
         Map<String, Object> record = new HashMap<>();
         Struct after = value.getStruct(Envelope.FieldName.AFTER);
@@ -133,15 +147,19 @@ public class DebeziumJsonDeserializer
                 .fields()
                 .forEach(
                         field -> {
-                            Object valueConverted =
-                                    convert(field.schema(), 
after.getWithoutDefault(field.name()));
-                            record.put(field.name(), valueConverted);
+                            if (!excludeColumns.contains(field.name())) {
+                                Object valueConverted =
+                                        convert(
+                                                field.schema(),
+                                                
after.getWithoutDefault(field.name()));
+                                record.put(field.name(), valueConverted);
+                            }
                         });
         record.put(DORIS_DELETE_SIGN, 0);
         return objectMapper.writeValueAsString(record);
     }
 
-    private String extractBeforeRow(Struct value, Schema valueSchema)
+    private String extractBeforeRow(Struct value, Schema valueSchema, 
Set<String> excludeColumns)
             throws JsonProcessingException {
         Map<String, Object> record = new HashMap<>();
         Struct before = value.getStruct(Envelope.FieldName.BEFORE);
@@ -153,9 +171,13 @@ public class DebeziumJsonDeserializer
                 .fields()
                 .forEach(
                         field -> {
-                            Object valueConverted =
-                                    convert(field.schema(), 
before.getWithoutDefault(field.name()));
-                            record.put(field.name(), valueConverted);
+                            if (!excludeColumns.contains(field.name())) {
+                                Object valueConverted =
+                                        convert(
+                                                field.schema(),
+                                                
before.getWithoutDefault(field.name()));
+                                record.put(field.name(), valueConverted);
+                            }
                         });
         record.put(DORIS_DELETE_SIGN, 1);
         return objectMapper.writeValueAsString(record);
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
index 2dc2310054b..aa1a6e9c7bd 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
@@ -33,6 +33,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -203,12 +204,35 @@ public class PostgresDebeziumJsonDeserializer extends 
DebeziumJsonDeserializer {
         // Generate DDLs using accurate PG column types
         String db = context.get(Constants.DORIS_TARGET_DB);
         List<String> ddls = new ArrayList<>();
+        Set<String> excludedCols =
+                excludeColumnsCache.getOrDefault(tableId.table(), 
Collections.emptySet());
 
         for (String colName : pgDropped) {
+            if (excludedCols.contains(colName)) {
+                // The column is excluded from sync — skip DDL; updatedSchemas 
already
+                // reflects the drop since it is built from afterSchema.
+                LOG.info(
+                        "[SCHEMA-CHANGE] Table {}: dropped column '{}' is 
excluded from sync,"
+                                + " skipping DROP DDL",
+                        tableId.identifier(),
+                        colName);
+                continue;
+            }
             ddls.add(SchemaChangeHelper.buildDropColumnSql(db, 
tableId.table(), colName));
         }
 
         for (Column col : pgAdded) {
+            if (excludedCols.contains(col.name())) {
+                // The column is excluded from sync — Doris table does not 
have it,
+                // so skip the ADD DDL.
+                // case: An excluded column was dropped and then re-added.
+                LOG.info(
+                        "[SCHEMA-CHANGE] Table {}: added column '{}' is 
excluded from sync,"
+                                + " skipping ADD DDL",
+                        tableId.identifier(),
+                        col.name());
+                continue;
+            }
             String colType = SchemaChangeHelper.columnToDorisType(col);
             String nullable = col.isOptional() ? "" : " NOT NULL";
             // pgAdded only contains columns present in afterSchema, so field 
lookup is safe.
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
index 122cb424f0e..56d9aeac53e 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
@@ -17,11 +17,18 @@
 
 package org.apache.doris.cdcclient.utils;
 
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+
 import org.apache.commons.lang3.StringUtils;
 
 import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -116,6 +123,52 @@ public class ConfigUtil {
         }
     }
 
+    /**
+     * Parse the exclude-column set for a specific table from config.
+     *
+     * <p>Looks for key {@code "table.<tableName>.exclude_columns"} whose 
value is a comma-separated
+     * column list, e.g. {@code "secret,internal_note"}.
+     *
+     * @return column name set (original case preserved); empty set when the 
key is absent
+     */
+    public static Set<String> parseExcludeColumns(Map<String, String> config, 
String tableName) {
+        String key =
+                DataSourceConfigKeys.TABLE
+                        + "."
+                        + tableName
+                        + "."
+                        + DataSourceConfigKeys.TABLE_EXCLUDE_COLUMNS_SUFFIX;
+        String value = config.get(key);
+        if (StringUtils.isEmpty(value)) {
+            return Collections.emptySet();
+        }
+        return Arrays.stream(value.split(","))
+                .map(String::trim)
+                .filter(s -> !s.isEmpty())
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Parse all per-table exclude-column sets from config at once.
+     *
+     * <p>Scans all keys matching {@code "table.<tableName>.exclude_columns"} 
and returns a map from
+     * table name to its excluded column set. Intended to be called once 
during initialization.
+     */
+    public static Map<String, Set<String>> parseAllExcludeColumns(Map<String, 
String> config) {
+        String prefix = DataSourceConfigKeys.TABLE + ".";
+        String suffix = "." + 
DataSourceConfigKeys.TABLE_EXCLUDE_COLUMNS_SUFFIX;
+        Map<String, Set<String>> result = new HashMap<>();
+        for (String key : config.keySet()) {
+            if (key.startsWith(prefix) && key.endsWith(suffix)) {
+                String tableName = key.substring(prefix.length(), key.length() 
- suffix.length());
+                if (!tableName.isEmpty()) {
+                    result.put(tableName, parseExcludeColumns(config, 
tableName));
+                }
+            }
+        }
+        return result;
+    }
+
     public static Map<String, String> toStringMap(String json) {
         if (!isJson(json)) {
             return null;
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_col_filter.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_col_filter.out
new file mode 100644
index 00000000000..751b83e6525
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_col_filter.out
@@ -0,0 +1,20 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_snapshot --
+A1     1
+B1     2
+
+-- !select_incremental --
+B1     20
+C1     3
+
+-- !select_after_drop_excluded --
+B1     20
+C1     3
+D1     4
+
+-- !select_after_readd_excluded --
+B1     20
+C1     3
+D1     4
+E1     5
+
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_dup.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_dup.groovy
index ecfd4a36cf3..2ddacebde16 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_dup.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_dup.groovy
@@ -16,7 +16,7 @@
 // under the License.
 
 suite("test_streaming_mysql_job_dup", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
-    def jobName = "test_streaming_mysql_job_name"
+    def jobName = "test_streaming_mysql_job_name_dup"
     def currentDb = (sql "select database()")[0][0]
     def table1 = "test_streaming_mysql_job_dup"
     def mysqlDb = "test_cdc_db"
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_col_filter.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_col_filter.groovy
new file mode 100644
index 00000000000..b90995c9b40
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_col_filter.groovy
@@ -0,0 +1,227 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_col_filter", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_pg_col_filter"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "user_info_pg_col_filter"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}_err1'"""
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}_err2'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // Create PG table with an extra "secret" column to be excluded
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+                      "name"   varchar(200),
+                      "age"    int2,
+                      "secret" varchar(200),
+                      PRIMARY KEY ("name")
+                    )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES ('A1', 1, 
'secret_A1')"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES ('B1', 2, 
'secret_B1')"""
+        }
+
+        // ── Validation: exclude a non-existent column should fail 
──────────────
+        try {
+            sql """CREATE JOB ${jobName}_err1
+                    ON STREAMING
+                    FROM POSTGRES (
+                        "jdbc_url"       = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                        "driver_url"     = "${driver_url}",
+                        "driver_class"   = "org.postgresql.Driver",
+                        "user"           = "${pgUser}",
+                        "password"       = "${pgPassword}",
+                        "database"       = "${pgDB}",
+                        "schema"         = "${pgSchema}",
+                        "include_tables" = "${table1}",
+                        "offset"         = "initial",
+                        "table.${table1}.exclude_columns" = "nonexistent_col"
+                    )
+                    TO DATABASE ${currentDb} (
+                        "table.create.properties.replication_num" = "1"
+                    )"""
+            assert false : "Should have thrown exception for non-existent 
excluded column"
+        } catch (Exception e) {
+            log.info("Expected error for non-existent column: " + e.message)
+            assert e.message.contains("does not exist") : "Unexpected error 
message: " + e.message
+        }
+
+        // ── Validation: exclude a PK column should fail 
────────────────────────
+        try {
+            sql """CREATE JOB ${jobName}_err2
+                    ON STREAMING
+                    FROM POSTGRES (
+                        "jdbc_url"       = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                        "driver_url"     = "${driver_url}",
+                        "driver_class"   = "org.postgresql.Driver",
+                        "user"           = "${pgUser}",
+                        "password"       = "${pgPassword}",
+                        "database"       = "${pgDB}",
+                        "schema"         = "${pgSchema}",
+                        "include_tables" = "${table1}",
+                        "offset"         = "initial",
+                        "table.${table1}.exclude_columns" = "name"
+                    )
+                    TO DATABASE ${currentDb} (
+                        "table.create.properties.replication_num" = "1"
+                    )"""
+            assert false : "Should have thrown exception for excluding PK 
column"
+        } catch (Exception e) {
+            log.info("Expected error for PK column: " + e.message)
+            assert e.message.contains("primary key") : "Unexpected error 
message: " + e.message
+        }
+
+        // ── Main job: exclude "secret" column 
──────────────────────────────────
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url"       = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url"     = "${driver_url}",
+                    "driver_class"   = "org.postgresql.Driver",
+                    "user"           = "${pgUser}",
+                    "password"       = "${pgPassword}",
+                    "database"       = "${pgDB}",
+                    "schema"         = "${pgSchema}",
+                    "include_tables" = "${table1}",
+                    "offset"         = "initial",
+                    "table.${table1}.exclude_columns" = "secret"
+                )
+                TO DATABASE ${currentDb} (
+                    "table.create.properties.replication_num" = "1"
+                )"""
+
+        // Verify Doris table was created WITHOUT the excluded column
+        def colNames = (sql """desc ${currentDb}.${table1}""").collect { it[0] 
}
+        assert !colNames.contains("secret") : "Excluded column 'secret' must 
not appear in Doris table"
+        assert colNames.contains("name")
+        assert colNames.contains("age")
+
+        // Wait for snapshot to complete
+        try {
+            Awaitility.await().atMost(300, SECONDS).pollInterval(1, 
SECONDS).until({
+                def cnt = sql """select SucceedTaskCount from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING'"""
+                cnt.size() == 1 && cnt.get(0).get(0).toLong() >= 2
+            })
+        } catch (Exception ex) {
+            def showJob  = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showTask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showJob)
+            log.info("show task: " + showTask)
+            throw ex
+        }
+
+        // Snapshot: only name and age, secret absent
+        qt_select_snapshot """ SELECT * FROM ${table1} ORDER BY name ASC """
+
+        // ── Incremental DML: secret values must not appear in Doris 
───────────
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES ('C1', 3, 
'secret_C1')"""
+            sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET age = 20, secret = 
'updated_secret' WHERE name = 'B1'"""
+            sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE name = 
'A1'"""
+        }
+        // Wait until C1 appears and A1 is gone
+        try {
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def names = (sql """ SELECT name FROM ${table1} ORDER BY name 
ASC """).collect { it[0] }
+                names.contains('C1') && !names.contains('A1')
+            })
+        } catch (Exception ex) {
+            def showJob  = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showTask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showJob)
+            log.info("show task: " + showTask)
+            throw ex
+        }
+
+        qt_select_incremental """ SELECT * FROM ${table1} ORDER BY name ASC """
+
+        // ── Schema change: DROP excluded column → DDL skipped, sync 
continues ─
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """ALTER TABLE ${pgDB}.${pgSchema}.${table1} DROP COLUMN 
secret"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age) 
VALUES ('D1', 4)"""
+        }
+        // Wait until D1 appears (schema change was skipped, sync should 
continue normally)
+        try {
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def names = (sql """ SELECT name FROM ${table1} ORDER BY name 
ASC """).collect { it[0] }
+                names.contains('D1')
+            })
+        } catch (Exception ex) {
+            def showJob  = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showTask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showJob)
+            log.info("show task: " + showTask)
+            throw ex
+        }
+
+        // Doris table still has no secret column (DDL was skipped)
+        def colNamesAfterDrop = (sql """desc 
${currentDb}.${table1}""").collect { it[0] }
+        assert !colNamesAfterDrop.contains("secret") : "secret column must not 
appear in Doris after DROP"
+
+        qt_select_after_drop_excluded """ SELECT * FROM ${table1} ORDER BY 
name ASC """
+
+        // ── Schema change: re-ADD excluded column → DDL also skipped 
──────────
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """ALTER TABLE ${pgDB}.${pgSchema}.${table1} ADD COLUMN secret 
varchar(200)"""
+            sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET secret = 
're_secret_C1' WHERE name = 'C1'"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age, 
secret) VALUES ('E1', 5, 'secret_E1')"""
+        }
+        // Wait until E1 appears (re-ADD DDL skipped, sync should continue 
normally)
+        try {
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def names = (sql """ SELECT name FROM ${table1} ORDER BY name 
ASC """).collect { it[0] }
+                names.contains('E1')
+            })
+        } catch (Exception ex) {
+            def showJob  = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showTask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showJob)
+            log.info("show task: " + showTask)
+            throw ex
+        }
+
+        // Doris table still does NOT have secret column (ADD DDL skipped)
+        def colNamesAfterReAdd = (sql """desc 
${currentDb}.${table1}""").collect { it[0] }
+        assert !colNamesAfterReAdd.contains("secret") : "secret column must 
not appear in Doris after re-ADD"
+
+        qt_select_after_readd_excluded """ SELECT * FROM ${table1} ORDER BY 
name ASC """
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert") 
where Name = '${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to