This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 23b290dbfdc branch-4.1: [Improve](Streamingjob) support
exclude_columns for Postgres streaming job #61267 (#61537)
23b290dbfdc is described below
commit 23b290dbfdc8c47928d5523510e34fdb971f65ad
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Mar 20 13:06:52 2026 +0800
branch-4.1: [Improve](Streamingjob) support exclude_columns for Postgres
streaming job #61267 (#61537)
Cherry-picked from #61267
Co-authored-by: wudi <[email protected]>
---
.../apache/doris/job/cdc/DataSourceConfigKeys.java | 4 +
.../streaming/DataSourceConfigValidator.java | 22 ++
.../apache/doris/job/util/StreamingJobUtils.java | 43 ++++
.../deserialize/DebeziumJsonDeserializer.java | 42 +++-
.../PostgresDebeziumJsonDeserializer.java | 24 +++
.../apache/doris/cdcclient/utils/ConfigUtil.java | 53 +++++
.../cdc/test_streaming_postgres_job_col_filter.out | 20 ++
.../cdc/test_streaming_mysql_job_dup.groovy | 2 +-
.../test_streaming_postgres_job_col_filter.groovy | 227 +++++++++++++++++++++
9 files changed, 426 insertions(+), 11 deletions(-)
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
index 77ce8b2bf51..3858d9ebaff 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
@@ -38,6 +38,10 @@ public class DataSourceConfigKeys {
public static final String SSL_MODE = "ssl_mode";
public static final String SSL_ROOTCERT = "ssl_rootcert";
+ // per-table config: key format is "table.<tableName>.<suffix>"
+ public static final String TABLE = "table";
+ public static final String TABLE_EXCLUDE_COLUMNS_SUFFIX =
"exclude_columns";
+
// target properties
public static final String TABLE_PROPS_PREFIX = "table.create.properties.";
public static final String LOAD_PROPERTIES = "load.";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
index f3d9e016950..3bea2c21242 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
@@ -43,11 +43,33 @@ public class DataSourceConfigValidator {
DataSourceConfigKeys.SSL_ROOTCERT
);
+ // Known suffixes for per-table config keys (format:
"table.<tableName>.<suffix>")
+ private static final Set<String> ALLOW_TABLE_LEVEL_SUFFIXES =
Sets.newHashSet(
+ DataSourceConfigKeys.TABLE_EXCLUDE_COLUMNS_SUFFIX
+ );
+
+ private static final String TABLE_LEVEL_PREFIX =
DataSourceConfigKeys.TABLE + ".";
+
public static void validateSource(Map<String, String> input) throws
IllegalArgumentException {
for (Map.Entry<String, String> entry : input.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
+ if (key.startsWith(TABLE_LEVEL_PREFIX)) {
+ // per-table config key must be exactly:
table.<tableName>.<suffix>
+ // reject malformed keys like "table.exclude_columns" (missing
tableName)
+ String[] parts = key.split("\\.", -1);
+ if (parts.length != 3 || parts[1].isEmpty()) {
+ throw new IllegalArgumentException("Malformed per-table
config key: '" + key
+ + "'. Expected format:
table.<tableName>.<suffix>");
+ }
+ String suffix = parts[parts.length - 1];
+ if (!ALLOW_TABLE_LEVEL_SUFFIXES.contains(suffix)) {
+ throw new IllegalArgumentException("Unknown per-table
config key: '" + key + "'");
+ }
+ continue;
+ }
+
if (!ALLOW_SOURCE_KEYS.contains(key)) {
throw new IllegalArgumentException("Unexpected key: '" + key +
"'");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
index 7299f0a9b50..d2222dad383 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
@@ -63,12 +63,14 @@ import org.apache.commons.text.StringSubstitutor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -319,6 +321,16 @@ public class StreamingJobUtils {
if (primaryKeys.isEmpty()) {
noPrimaryKeyTables.add(table);
}
+
+ // Validate and apply exclude_columns for this table
+ Set<String> excludeColumns = parseExcludeColumns(properties,
table);
+ if (!excludeColumns.isEmpty()) {
+ validateExcludeColumns(excludeColumns, table, columns,
primaryKeys);
+ columns = columns.stream()
+ .filter(col -> !excludeColumns.contains(col.getName()))
+ .collect(Collectors.toList());
+ }
+
// Convert Column to ColumnDefinition
List<ColumnDefinition> columnDefinitions =
columns.stream().map(col -> {
DataType dataType = DataType.fromCatalogType(col.getType());
@@ -437,6 +449,37 @@ public class StreamingJobUtils {
return remoteDb;
}
+ private static Set<String> parseExcludeColumns(Map<String, String>
properties, String tableName) {
+ String key = DataSourceConfigKeys.TABLE + "." + tableName + "."
+ + DataSourceConfigKeys.TABLE_EXCLUDE_COLUMNS_SUFFIX;
+ String value = properties.get(key);
+ if (StringUtils.isEmpty(value)) {
+ return Collections.emptySet();
+ }
+ return Arrays.stream(value.split(","))
+ .map(String::trim)
+ .filter(s -> !s.isEmpty())
+ .collect(Collectors.toSet());
+ }
+
+ private static void validateExcludeColumns(Set<String> excludeColumns,
String tableName,
+ List<Column> columns, List<String> primaryKeys) throws
JobException {
+ Set<String> colNames =
columns.stream().map(Column::getName).collect(Collectors.toSet());
+ for (String col : excludeColumns) {
+ if (!colNames.contains(col)) {
+ throw new JobException(String.format(
+ "exclude_columns validation failed: column '%s' does
not exist in table '%s'",
+ col, tableName));
+ }
+ if (primaryKeys.contains(col)) {
+ throw new JobException(String.format(
+ "exclude_columns validation failed: column '%s' in
table '%s'"
+ + " is a primary key column and cannot be
excluded",
+ col, tableName));
+ }
+ }
+ }
+
private static Map<String, String> getTableCreateProperties(Map<String,
String> properties) {
final Map<String, String> tableCreateProps = new HashMap<>();
for (Map.Entry<String, String> entry : properties.entrySet()) {
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
index 065a3da2c09..1ec6da91fa6 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
@@ -38,12 +38,15 @@ import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
import static org.apache.doris.cdcclient.common.Constants.DORIS_DELETE_SIGN;
import com.esri.core.geometry.ogc.OGCGeometry;
@@ -79,6 +82,8 @@ public class DebeziumJsonDeserializer
private static ObjectMapper objectMapper = new ObjectMapper();
@Setter private ZoneId serverTimeZone = ZoneId.systemDefault();
@Getter @Setter protected Map<TableId, TableChanges.TableChange>
tableSchemas;
+ // Parsed exclude-column sets per table, populated once in init() from
config
+ protected Map<String, Set<String>> excludeColumnsCache = new HashMap<>();
public DebeziumJsonDeserializer() {}
@@ -86,6 +91,7 @@ public class DebeziumJsonDeserializer
public void init(Map<String, String> props) {
this.serverTimeZone =
ConfigUtil.getServerTimeZoneFromJdbcUrl(props.get(DataSourceConfigKeys.JDBC_URL));
+ excludeColumnsCache = ConfigUtil.parseAllExcludeColumns(props);
}
@Override
@@ -102,18 +108,21 @@ public class DebeziumJsonDeserializer
private List<String> deserializeDataChangeRecord(SourceRecord record)
throws IOException {
List<String> rows = new ArrayList<>();
+ String tableName = extractTableName(record);
+ Set<String> excludeColumns =
+ excludeColumnsCache.getOrDefault(tableName,
Collections.emptySet());
Envelope.Operation op = Envelope.operationFor(record);
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
if (Envelope.Operation.DELETE.equals(op)) {
- String deleteRow = extractBeforeRow(value, valueSchema);
+ String deleteRow = extractBeforeRow(value, valueSchema,
excludeColumns);
if (StringUtils.isNotEmpty(deleteRow)) {
rows.add(deleteRow);
}
} else if (Envelope.Operation.READ.equals(op)
|| Envelope.Operation.CREATE.equals(op)
|| Envelope.Operation.UPDATE.equals(op)) {
- String insertRow = extractAfterRow(value, valueSchema);
+ String insertRow = extractAfterRow(value, valueSchema,
excludeColumns);
if (StringUtils.isNotEmpty(insertRow)) {
rows.add(insertRow);
}
@@ -121,7 +130,12 @@ public class DebeziumJsonDeserializer
return rows;
}
- private String extractAfterRow(Struct value, Schema valueSchema)
+ private String extractTableName(SourceRecord record) {
+ Struct value = (Struct) record.value();
+ return
value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY);
+ }
+
+ private String extractAfterRow(Struct value, Schema valueSchema,
Set<String> excludeColumns)
throws JsonProcessingException {
Map<String, Object> record = new HashMap<>();
Struct after = value.getStruct(Envelope.FieldName.AFTER);
@@ -133,15 +147,19 @@ public class DebeziumJsonDeserializer
.fields()
.forEach(
field -> {
- Object valueConverted =
- convert(field.schema(),
after.getWithoutDefault(field.name()));
- record.put(field.name(), valueConverted);
+ if (!excludeColumns.contains(field.name())) {
+ Object valueConverted =
+ convert(
+ field.schema(),
+
after.getWithoutDefault(field.name()));
+ record.put(field.name(), valueConverted);
+ }
});
record.put(DORIS_DELETE_SIGN, 0);
return objectMapper.writeValueAsString(record);
}
- private String extractBeforeRow(Struct value, Schema valueSchema)
+ private String extractBeforeRow(Struct value, Schema valueSchema,
Set<String> excludeColumns)
throws JsonProcessingException {
Map<String, Object> record = new HashMap<>();
Struct before = value.getStruct(Envelope.FieldName.BEFORE);
@@ -153,9 +171,13 @@ public class DebeziumJsonDeserializer
.fields()
.forEach(
field -> {
- Object valueConverted =
- convert(field.schema(),
before.getWithoutDefault(field.name()));
- record.put(field.name(), valueConverted);
+ if (!excludeColumns.contains(field.name())) {
+ Object valueConverted =
+ convert(
+ field.schema(),
+
before.getWithoutDefault(field.name()));
+ record.put(field.name(), valueConverted);
+ }
});
record.put(DORIS_DELETE_SIGN, 1);
return objectMapper.writeValueAsString(record);
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
index 2dc2310054b..aa1a6e9c7bd 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
@@ -33,6 +33,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -203,12 +204,35 @@ public class PostgresDebeziumJsonDeserializer extends
DebeziumJsonDeserializer {
// Generate DDLs using accurate PG column types
String db = context.get(Constants.DORIS_TARGET_DB);
List<String> ddls = new ArrayList<>();
+ Set<String> excludedCols =
+ excludeColumnsCache.getOrDefault(tableId.table(),
Collections.emptySet());
for (String colName : pgDropped) {
+ if (excludedCols.contains(colName)) {
+ // The column is excluded from sync — skip DDL; updatedSchemas
already
+ // reflects the drop since it is built from afterSchema.
+ LOG.info(
+ "[SCHEMA-CHANGE] Table {}: dropped column '{}' is
excluded from sync,"
+ + " skipping DROP DDL",
+ tableId.identifier(),
+ colName);
+ continue;
+ }
ddls.add(SchemaChangeHelper.buildDropColumnSql(db,
tableId.table(), colName));
}
for (Column col : pgAdded) {
+ if (excludedCols.contains(col.name())) {
+ // The column is excluded from sync — Doris table does not
have it,
+ // so skip the ADD DDL.
+ // case: An excluded column was dropped and then re-added.
+ LOG.info(
+ "[SCHEMA-CHANGE] Table {}: added column '{}' is
excluded from sync,"
+ + " skipping ADD DDL",
+ tableId.identifier(),
+ col.name());
+ continue;
+ }
String colType = SchemaChangeHelper.columnToDorisType(col);
String nullable = col.isOptional() ? "" : " NOT NULL";
// pgAdded only contains columns present in afterSchema, so field
lookup is safe.
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
index 122cb424f0e..56d9aeac53e 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
@@ -17,11 +17,18 @@
package org.apache.doris.cdcclient.utils;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+
import org.apache.commons.lang3.StringUtils;
import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -116,6 +123,52 @@ public class ConfigUtil {
}
}
+ /**
+ * Parse the exclude-column set for a specific table from config.
+ *
+ * <p>Looks for key {@code "table.<tableName>.exclude_columns"} whose
value is a comma-separated
+ * column list, e.g. {@code "secret,internal_note"}.
+ *
+ * @return column name set (original case preserved); empty set when the
key is absent
+ */
+ public static Set<String> parseExcludeColumns(Map<String, String> config,
String tableName) {
+ String key =
+ DataSourceConfigKeys.TABLE
+ + "."
+ + tableName
+ + "."
+ + DataSourceConfigKeys.TABLE_EXCLUDE_COLUMNS_SUFFIX;
+ String value = config.get(key);
+ if (StringUtils.isEmpty(value)) {
+ return Collections.emptySet();
+ }
+ return Arrays.stream(value.split(","))
+ .map(String::trim)
+ .filter(s -> !s.isEmpty())
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * Parse all per-table exclude-column sets from config at once.
+ *
+ * <p>Scans all keys matching {@code "table.<tableName>.exclude_columns"}
and returns a map from
+ * table name to its excluded column set. Intended to be called once
during initialization.
+ */
+ public static Map<String, Set<String>> parseAllExcludeColumns(Map<String,
String> config) {
+ String prefix = DataSourceConfigKeys.TABLE + ".";
+ String suffix = "." +
DataSourceConfigKeys.TABLE_EXCLUDE_COLUMNS_SUFFIX;
+ Map<String, Set<String>> result = new HashMap<>();
+ for (String key : config.keySet()) {
+ if (key.startsWith(prefix) && key.endsWith(suffix)) {
+ String tableName = key.substring(prefix.length(), key.length()
- suffix.length());
+ if (!tableName.isEmpty()) {
+ result.put(tableName, parseExcludeColumns(config,
tableName));
+ }
+ }
+ }
+ return result;
+ }
+
public static Map<String, String> toStringMap(String json) {
if (!isJson(json)) {
return null;
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_col_filter.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_col_filter.out
new file mode 100644
index 00000000000..751b83e6525
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_col_filter.out
@@ -0,0 +1,20 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_snapshot --
+A1 1
+B1 2
+
+-- !select_incremental --
+B1 20
+C1 3
+
+-- !select_after_drop_excluded --
+B1 20
+C1 3
+D1 4
+
+-- !select_after_readd_excluded --
+B1 20
+C1 3
+D1 4
+E1 5
+
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_dup.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_dup.groovy
index ecfd4a36cf3..2ddacebde16 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_dup.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_dup.groovy
@@ -16,7 +16,7 @@
// under the License.
suite("test_streaming_mysql_job_dup",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
- def jobName = "test_streaming_mysql_job_name"
+ def jobName = "test_streaming_mysql_job_name_dup"
def currentDb = (sql "select database()")[0][0]
def table1 = "test_streaming_mysql_job_dup"
def mysqlDb = "test_cdc_db"
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_col_filter.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_col_filter.groovy
new file mode 100644
index 00000000000..b90995c9b40
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_col_filter.groovy
@@ -0,0 +1,227 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_col_filter",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_pg_col_filter"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "user_info_pg_col_filter"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}_err1'"""
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}_err2'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // Create PG table with an extra "secret" column to be excluded
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+ "name" varchar(200),
+ "age" int2,
+ "secret" varchar(200),
+ PRIMARY KEY ("name")
+ )"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES ('A1', 1,
'secret_A1')"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES ('B1', 2,
'secret_B1')"""
+ }
+
+ // ── Validation: exclude a non-existent column should fail
──────────────
+ try {
+ sql """CREATE JOB ${jobName}_err1
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "offset" = "initial",
+ "table.${table1}.exclude_columns" = "nonexistent_col"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )"""
+ assert false : "Should have thrown exception for non-existent
excluded column"
+ } catch (Exception e) {
+ log.info("Expected error for non-existent column: " + e.message)
+ assert e.message.contains("does not exist") : "Unexpected error
message: " + e.message
+ }
+
+ // ── Validation: exclude a PK column should fail
────────────────────────
+ try {
+ sql """CREATE JOB ${jobName}_err2
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "offset" = "initial",
+ "table.${table1}.exclude_columns" = "name"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )"""
+ assert false : "Should have thrown exception for excluding PK
column"
+ } catch (Exception e) {
+ log.info("Expected error for PK column: " + e.message)
+ assert e.message.contains("primary key") : "Unexpected error
message: " + e.message
+ }
+
+ // ── Main job: exclude "secret" column
──────────────────────────────────
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "offset" = "initial",
+ "table.${table1}.exclude_columns" = "secret"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )"""
+
+ // Verify Doris table was created WITHOUT the excluded column
+ def colNames = (sql """desc ${currentDb}.${table1}""").collect { it[0]
}
+ assert !colNames.contains("secret") : "Excluded column 'secret' must
not appear in Doris table"
+ assert colNames.contains("name")
+ assert colNames.contains("age")
+
+ // Wait for snapshot to complete
+ try {
+ Awaitility.await().atMost(300, SECONDS).pollInterval(1,
SECONDS).until({
+ def cnt = sql """select SucceedTaskCount from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING'"""
+ cnt.size() == 1 && cnt.get(0).get(0).toLong() >= 2
+ })
+ } catch (Exception ex) {
+ def showJob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showTask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showJob)
+ log.info("show task: " + showTask)
+ throw ex
+ }
+
+ // Snapshot: only name and age, secret absent
+ qt_select_snapshot """ SELECT * FROM ${table1} ORDER BY name ASC """
+
+ // ── Incremental DML: secret values must not appear in Doris
───────────
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES ('C1', 3,
'secret_C1')"""
+ sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET age = 20, secret =
'updated_secret' WHERE name = 'B1'"""
+ sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE name =
'A1'"""
+ }
+ // Wait until C1 appears and A1 is gone
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def names = (sql """ SELECT name FROM ${table1} ORDER BY name
ASC """).collect { it[0] }
+ names.contains('C1') && !names.contains('A1')
+ })
+ } catch (Exception ex) {
+ def showJob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showTask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showJob)
+ log.info("show task: " + showTask)
+ throw ex
+ }
+
+ qt_select_incremental """ SELECT * FROM ${table1} ORDER BY name ASC """
+
+ // ── Schema change: DROP excluded column → DDL skipped, sync
continues ─
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """ALTER TABLE ${pgDB}.${pgSchema}.${table1} DROP COLUMN
secret"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age)
VALUES ('D1', 4)"""
+ }
+ // Wait until D1 appears (schema change was skipped, sync should
continue normally)
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def names = (sql """ SELECT name FROM ${table1} ORDER BY name
ASC """).collect { it[0] }
+ names.contains('D1')
+ })
+ } catch (Exception ex) {
+ def showJob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showTask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showJob)
+ log.info("show task: " + showTask)
+ throw ex
+ }
+
+ // Doris table still has no secret column (DDL was skipped)
+ def colNamesAfterDrop = (sql """desc
${currentDb}.${table1}""").collect { it[0] }
+ assert !colNamesAfterDrop.contains("secret") : "secret column must not
appear in Doris after DROP"
+
+ qt_select_after_drop_excluded """ SELECT * FROM ${table1} ORDER BY
name ASC """
+
+ // ── Schema change: re-ADD excluded column → DDL also skipped
──────────
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """ALTER TABLE ${pgDB}.${pgSchema}.${table1} ADD COLUMN secret
varchar(200)"""
+ sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET secret =
're_secret_C1' WHERE name = 'C1'"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age,
secret) VALUES ('E1', 5, 'secret_E1')"""
+ }
+ // Wait until E1 appears (re-ADD DDL skipped, sync should continue
normally)
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def names = (sql """ SELECT name FROM ${table1} ORDER BY name
ASC """).collect { it[0] }
+ names.contains('E1')
+ })
+ } catch (Exception ex) {
+ def showJob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showTask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showJob)
+ log.info("show task: " + showTask)
+ throw ex
+ }
+
+ // Doris table still does NOT have secret column (ADD DDL skipped)
+ def colNamesAfterReAdd = (sql """desc
${currentDb}.${table1}""").collect { it[0] }
+ assert !colNamesAfterReAdd.contains("secret") : "secret column must
not appear in Doris after re-ADD"
+
+ qt_select_after_readd_excluded """ SELECT * FROM ${table1} ORDER BY
name ASC """
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name = '${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]