This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 497adc82612 branch-4.1: [Improve](StreamingJob) Support schema change
for PostgreSQL streaming job #61182 (#61453)
497adc82612 is described below
commit 497adc826122b0848ca6efecd21deb2c26631acd
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Mar 19 18:41:30 2026 +0800
branch-4.1: [Improve](StreamingJob) Support schema change for PostgreSQL
streaming job #61182 (#61453)
Cherry-picked from #61182
Co-authored-by: wudi <[email protected]>
---
.../doris/job/cdc/request/CommitOffsetRequest.java | 17 +-
.../job/cdc/request/JobBaseRecordRequest.java | 1 +
.../insert/streaming/StreamingInsertJob.java | 4 +
.../insert/streaming/StreamingMultiTblTask.java | 6 +
.../job/offset/jdbc/JdbcSourceOffsetProvider.java | 4 +
.../apache/doris/cdcclient/common/Constants.java | 2 +
.../apache/doris/cdcclient/common/DorisType.java | 47 +++
.../cdcclient/service/PipelineCoordinator.java | 61 +++-
.../doris/cdcclient/sink/DorisBatchStreamLoad.java | 25 +-
.../doris/cdcclient/sink/HttpPutBuilder.java | 3 +-
.../deserialize/DebeziumJsonDeserializer.java | 18 +-
.../source/deserialize/DeserializeResult.java | 92 ++++++
.../deserialize/MySqlDebeziumJsonDeserializer.java | 66 ++++
.../PostgresDebeziumJsonDeserializer.java | 248 +++++++++++++++
.../deserialize/SourceRecordDeserializer.java | 5 +
.../source/reader/AbstractCdcSourceReader.java | 171 ++++++++++
.../source/reader/JdbcIncrementalSourceReader.java | 24 +-
.../cdcclient/source/reader/SourceReader.java | 18 +-
.../source/reader/mysql/MySqlSourceReader.java | 21 +-
.../reader/postgres/PostgresSourceReader.java | 32 ++
.../org/apache/doris/cdcclient/utils/HttpUtil.java | 4 +
.../doris/cdcclient/utils/SchemaChangeHelper.java | 291 +++++++++++++++++
.../doris/cdcclient/utils/SchemaChangeManager.java | 149 +++++++++
.../cdcclient/utils/SchemaChangeHelperTest.java | 194 ++++++++++++
.../cdc/test_streaming_postgres_job_sc.out | 32 ++
.../test_streaming_postgres_job_sc_advanced.out | 31 ++
.../cdc/test_streaming_postgres_job_sc.groovy | 269 ++++++++++++++++
.../test_streaming_postgres_job_sc_advanced.groovy | 344 +++++++++++++++++++++
28 files changed, 2132 insertions(+), 47 deletions(-)
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CommitOffsetRequest.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CommitOffsetRequest.java
index 3d2d221ea49..747e82b4fee 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CommitOffsetRequest.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CommitOffsetRequest.java
@@ -22,12 +22,10 @@ import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
-import lombok.ToString;
@Getter
@Setter
@NoArgsConstructor
-@ToString
@AllArgsConstructor
@Builder
public class CommitOffsetRequest {
@@ -38,4 +36,19 @@ public class CommitOffsetRequest {
public long filteredRows;
public long loadedRows;
public long loadBytes;
+ public String tableSchemas;
+
+ @Override
+ public String toString() {
+ return "CommitOffsetRequest{"
+ + "jobId=" + jobId
+ + ", taskId=" + taskId
+ + ", offset='" + offset + "'"
+ + ", scannedRows=" + scannedRows
+ + ", filteredRows=" + filteredRows
+ + ", loadedRows=" + loadedRows
+ + ", loadBytes=" + loadBytes
+ + ", tableSchemasSize=" + (tableSchemas != null ?
tableSchemas.length() : 0)
+ + "}";
+ }
}
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
index 282913e2dd2..27784b1701b 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
@@ -28,4 +28,5 @@ import java.util.Map;
@EqualsAndHashCode(callSuper = true)
public abstract class JobBaseRecordRequest extends JobBaseConfig {
protected Map<String, Object> meta;
+ protected String tableSchemas;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index f2b07e02ab1..ea5298c026a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -1177,6 +1177,10 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
JdbcSourceOffsetProvider op = (JdbcSourceOffsetProvider)
offsetProvider;
op.setHasMoreData(false);
}
+ if (offsetRequest.getTableSchemas() != null) {
+ JdbcSourceOffsetProvider op = (JdbcSourceOffsetProvider)
offsetProvider;
+ op.setTableSchemas(offsetRequest.getTableSchemas());
+ }
persistOffsetProviderIfNeed();
log.info("Streaming multi table job {} task {} commit offset
successfully, offset: {}",
getJobId(), offsetRequest.getTaskId(),
offsetRequest.getOffset());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index cf9a9b905be..45d2cf2ffbb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -198,6 +198,12 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
String feAddr = Env.getCurrentEnv().getMasterHost() + ":" +
Env.getCurrentEnv().getMasterHttpPort();
request.setFrontendAddress(feAddr);
request.setMaxInterval(jobProperties.getMaxIntervalSecond());
+ if (offsetProvider instanceof JdbcSourceOffsetProvider) {
+ String schemas = ((JdbcSourceOffsetProvider)
offsetProvider).getTableSchemas();
+ if (schemas != null) {
+ request.setTableSchemas(schemas);
+ }
+ }
return request;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index d04245317b5..d8959086fa5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -87,6 +87,9 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
@SerializedName("bop")
Map<String, String> binlogOffsetPersist;
+ @SerializedName("ts")
+ String tableSchemas;
+
volatile boolean hasMoreData = true;
public JdbcSourceOffsetProvider(Long jobId, DataSourceType sourceType,
Map<String, String> sourceProperties) {
@@ -355,6 +358,7 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
JdbcSourceOffsetProvider.class);
this.binlogOffsetPersist =
replayFromPersist.getBinlogOffsetPersist();
this.chunkHighWatermarkMap =
replayFromPersist.getChunkHighWatermarkMap();
+ this.tableSchemas = replayFromPersist.getTableSchemas();
log.info("Replaying offset provider for job {}, binlogOffset size
{}, chunkHighWatermark size {}",
getJobId(),
binlogOffsetPersist == null ? 0 :
binlogOffsetPersist.size(),
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java
index 04ec118a6c2..953903a8032 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java
@@ -23,4 +23,6 @@ public class Constants {
// Debezium default properties
public static final long DEBEZIUM_HEARTBEAT_INTERVAL_MS = 3000L;
+
+ public static final String DORIS_TARGET_DB = "doris_target_db";
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/DorisType.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/DorisType.java
new file mode 100644
index 00000000000..3aad97bb0cd
--- /dev/null
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/DorisType.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.common;
+
+public class DorisType {
+ public static final String BOOLEAN = "BOOLEAN";
+ public static final String TINYINT = "TINYINT";
+ public static final String SMALLINT = "SMALLINT";
+ public static final String INT = "INT";
+ public static final String BIGINT = "BIGINT";
+ public static final String LARGEINT = "LARGEINT";
+ // largeint is bigint unsigned in information_schema.COLUMNS
+ public static final String BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+ public static final String FLOAT = "FLOAT";
+ public static final String DOUBLE = "DOUBLE";
+ public static final String DECIMAL = "DECIMAL";
+ public static final String DATE = "DATE";
+ public static final String DATETIME = "DATETIME";
+ public static final String CHAR = "CHAR";
+ public static final String VARCHAR = "VARCHAR";
+ public static final String STRING = "STRING";
+ public static final String HLL = "HLL";
+ public static final String BITMAP = "BITMAP";
+ public static final String ARRAY = "ARRAY";
+ public static final String JSONB = "JSONB";
+ public static final String JSON = "JSON";
+ public static final String MAP = "MAP";
+ public static final String STRUCT = "STRUCT";
+ public static final String VARIANT = "VARIANT";
+ public static final String IPV4 = "IPV4";
+ public static final String IPV6 = "IPV6";
+}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 614c506619f..97aa4b7f5f2 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -21,8 +21,10 @@ import org.apache.doris.cdcclient.common.Constants;
import org.apache.doris.cdcclient.common.Env;
import org.apache.doris.cdcclient.model.response.RecordWithMeta;
import org.apache.doris.cdcclient.sink.DorisBatchStreamLoad;
+import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
import org.apache.doris.cdcclient.source.reader.SourceReader;
import org.apache.doris.cdcclient.source.reader.SplitReadResult;
+import org.apache.doris.cdcclient.utils.SchemaChangeManager;
import org.apache.doris.job.cdc.request.FetchRecordRequest;
import org.apache.doris.job.cdc.request.WriteRecordRequest;
import org.apache.doris.job.cdc.split.BinlogSplit;
@@ -166,11 +168,12 @@ public class PipelineCoordinator {
}
// Process data messages
- List<String> serializedRecords =
+ DeserializeResult result =
sourceReader.deserialize(fetchRecord.getConfig(),
element);
- if (!CollectionUtils.isEmpty(serializedRecords)) {
+ if (result.getType() == DeserializeResult.Type.DML
+ && !CollectionUtils.isEmpty(result.getRecords())) {
recordCount++;
- recordResponse.getRecords().addAll(serializedRecords);
+
recordResponse.getRecords().addAll(result.getRecords());
hasReceivedData = true;
lastMessageIsHeartbeat = false;
}
@@ -236,21 +239,34 @@ public class PipelineCoordinator {
* <p>Heartbeat events will carry the latest offset.
*/
public void writeRecords(WriteRecordRequest writeRecordRequest) throws
Exception {
+ // Extract connection parameters up front for use throughout this
method
+ String feAddr = writeRecordRequest.getFrontendAddress();
+ String targetDb = writeRecordRequest.getTargetDb();
+ String token = writeRecordRequest.getToken();
+
+ // Enrich the source config with the Doris target DB so the
deserializer can build
+ // DDL referencing the correct Doris database, not the upstream source
database.
+ Map<String, String> deserializeContext = new
HashMap<>(writeRecordRequest.getConfig());
+ deserializeContext.put(Constants.DORIS_TARGET_DB, targetDb);
+
SourceReader sourceReader =
Env.getCurrentEnv().getReader(writeRecordRequest);
DorisBatchStreamLoad batchStreamLoad = null;
long scannedRows = 0L;
int heartbeatCount = 0;
SplitReadResult readResult = null;
+ boolean hasExecuteDDL = false;
+ boolean isSnapshotSplit = false;
try {
// 1. submit split async
readResult =
sourceReader.prepareAndSubmitSplit(writeRecordRequest);
batchStreamLoad = getOrCreateBatchStreamLoad(writeRecordRequest);
- boolean isSnapshotSplit =
sourceReader.isSnapshotSplit(readResult.getSplit());
+ isSnapshotSplit =
sourceReader.isSnapshotSplit(readResult.getSplit());
long startTime = System.currentTimeMillis();
long maxIntervalMillis = writeRecordRequest.getMaxInterval() *
1000;
boolean shouldStop = false;
boolean lastMessageIsHeartbeat = false;
+
LOG.info(
"Start polling records for jobId={} taskId={},
isSnapshotSplit={}",
writeRecordRequest.getJobId(),
@@ -309,15 +325,22 @@ public class PipelineCoordinator {
}
// Process data messages
- List<String> serializedRecords =
-
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
-
- if (!CollectionUtils.isEmpty(serializedRecords)) {
- String database = writeRecordRequest.getTargetDb();
+ DeserializeResult result =
+ sourceReader.deserialize(deserializeContext,
element);
+
+ if (result.getType() ==
DeserializeResult.Type.SCHEMA_CHANGE) {
+ // Flush pending data before DDL
+ batchStreamLoad.forceFlush();
+ SchemaChangeManager.executeDdls(feAddr, targetDb,
token, result.getDdls());
+ hasExecuteDDL = true;
+
sourceReader.applySchemaChange(result.getUpdatedSchemas());
+ lastMessageIsHeartbeat = false;
+ }
+ if (!CollectionUtils.isEmpty(result.getRecords())) {
String table = extractTable(element);
- for (String record : serializedRecords) {
+ for (String record : result.getRecords()) {
scannedRows++;
- batchStreamLoad.writeRecord(database, table,
record.getBytes());
+ batchStreamLoad.writeRecord(targetDb, table,
record.getBytes());
}
// Mark last message as data (not heartbeat)
lastMessageIsHeartbeat = false;
@@ -346,8 +369,22 @@ public class PipelineCoordinator {
// The offset must be reset before commitOffset to prevent the next
taskId from being create
// by the fe.
batchStreamLoad.resetTaskId();
+
+ // Serialize tableSchemas back to FE when:
+ // 1. A DDL was executed (in-memory schema was updated), OR
+ // 2. It's a binlog split AND FE had no schema (FE tableSchemas was
null) — this covers
+ // incremental-only startup and the first binlog round after
snapshot completes.
+ String tableSchemas = null;
+ boolean feHadNoSchema = writeRecordRequest.getTableSchemas() == null;
+ if (hasExecuteDDL || (!isSnapshotSplit && feHadNoSchema)) {
+ tableSchemas = sourceReader.serializeTableSchemas();
+ }
batchStreamLoad.commitOffset(
- currentTaskId, metaResponse, scannedRows,
batchStreamLoad.getLoadStatistic());
+ currentTaskId,
+ metaResponse,
+ scannedRows,
+ batchStreamLoad.getLoadStatistic(),
+ tableSchemas);
}
public static boolean isHeartbeatEvent(SourceRecord record) {
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
index 92a2f9db2b6..72e84c4413c 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
@@ -56,6 +56,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;
@@ -503,7 +504,8 @@ public class DorisBatchStreamLoad implements Serializable {
String taskId,
List<Map<String, String>> meta,
long scannedRows,
- LoadStatistic loadStatistic) {
+ LoadStatistic loadStatistic,
+ String tableSchemas) {
try {
String url = String.format(COMMIT_URL_PATTERN, frontendAddress,
targetDb);
CommitOffsetRequest commitRequest =
@@ -515,6 +517,7 @@ public class DorisBatchStreamLoad implements Serializable {
.filteredRows(loadStatistic.getFilteredRows())
.loadedRows(loadStatistic.getLoadedRows())
.loadBytes(loadStatistic.getLoadBytes())
+ .tableSchemas(tableSchemas)
.build();
String param = OBJECT_MAPPER.writeValueAsString(commitRequest);
@@ -527,7 +530,11 @@ public class DorisBatchStreamLoad implements Serializable {
.commit()
.setEntity(new StringEntity(param));
- LOG.info("commit offset for jobId {} taskId {}, params {}", jobId,
taskId, param);
+ LOG.info(
+ "commit offset for jobId {} taskId {}, commitRequest {}",
+ jobId,
+ taskId,
+ commitRequest.toString());
Throwable resEx = null;
int retry = 0;
while (retry <= RETRY) {
@@ -541,11 +548,15 @@ public class DorisBatchStreamLoad implements Serializable
{
: "";
LOG.info("commit result {}", responseBody);
if (statusCode == 200) {
- LOG.info("commit offset for jobId {} taskId {}",
jobId, taskId);
- // A 200 response indicates that the request was
successful, and
- // information such as offset and statistics may
have already been
- // updated. Retrying may result in repeated
updates.
- return;
+ JsonNode root =
OBJECT_MAPPER.readTree(responseBody);
+ JsonNode code = root.get("code");
+ if (code != null && code.asInt() == 0) {
+ LOG.info(
+ "commit offset for jobId {} taskId {}
successfully",
+ jobId,
+ taskId);
+ return;
+ }
}
LOG.error(
"commit offset failed with {}, reason {}, to
retry",
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/HttpPutBuilder.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/HttpPutBuilder.java
index 3abd9eaabc2..d24f61397a2 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/HttpPutBuilder.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/HttpPutBuilder.java
@@ -18,6 +18,7 @@
package org.apache.doris.cdcclient.sink;
import org.apache.doris.cdcclient.common.Constants;
+import org.apache.doris.cdcclient.utils.HttpUtil;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.collections.MapUtils;
@@ -69,7 +70,7 @@ public class HttpPutBuilder {
}
public HttpPutBuilder addTokenAuth(String token) {
- header.put(HttpHeaders.AUTHORIZATION, "Basic YWRtaW46");
+ header.put(HttpHeaders.AUTHORIZATION, HttpUtil.getAuthHeader());
header.put("token", token);
return this;
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
index 556c186b5d4..065a3da2c09 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
@@ -38,7 +38,6 @@ import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -58,6 +57,8 @@ import io.debezium.data.VariableScaleDecimal;
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
import io.debezium.time.MicroTime;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.NanoTime;
@@ -65,17 +66,19 @@ import io.debezium.time.NanoTimestamp;
import io.debezium.time.Time;
import io.debezium.time.Timestamp;
import io.debezium.time.ZonedTimestamp;
+import lombok.Getter;
import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/** SourceRecord ==> [{},{}] */
+/** SourceRecord ==> DeserializeResult */
public class DebeziumJsonDeserializer
- implements SourceRecordDeserializer<SourceRecord, List<String>> {
+ implements SourceRecordDeserializer<SourceRecord, DeserializeResult> {
private static final long serialVersionUID = 1L;
private static final Logger LOG =
LoggerFactory.getLogger(DebeziumJsonDeserializer.class);
private static ObjectMapper objectMapper = new ObjectMapper();
@Setter private ZoneId serverTimeZone = ZoneId.systemDefault();
+ @Getter @Setter protected Map<TableId, TableChanges.TableChange>
tableSchemas;
public DebeziumJsonDeserializer() {}
@@ -86,15 +89,14 @@ public class DebeziumJsonDeserializer
}
@Override
- public List<String> deserialize(Map<String, String> context, SourceRecord
record)
+ public DeserializeResult deserialize(Map<String, String> context,
SourceRecord record)
throws IOException {
if (RecordUtils.isDataChangeRecord(record)) {
LOG.trace("Process data change record: {}", record);
- return deserializeDataChangeRecord(record);
- } else if (RecordUtils.isSchemaChangeEvent(record)) {
- return Collections.emptyList();
+ List<String> rows = deserializeDataChangeRecord(record);
+ return DeserializeResult.dml(rows);
} else {
- return Collections.emptyList();
+ return DeserializeResult.empty();
}
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DeserializeResult.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DeserializeResult.java
new file mode 100644
index 00000000000..c1e69c77e5a
--- /dev/null
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DeserializeResult.java
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.deserialize;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+
+/** Result of deserializing a SourceRecord. */
+public class DeserializeResult {
+
+ public enum Type {
+ DML,
+ SCHEMA_CHANGE,
+ EMPTY
+ }
+
+ private final Type type;
+ private final List<String> records;
+ private final List<String> ddls;
+ private final Map<TableId, TableChanges.TableChange> updatedSchemas;
+
+ private DeserializeResult(
+ Type type,
+ List<String> records,
+ List<String> ddls,
+ Map<TableId, TableChanges.TableChange> updatedSchemas) {
+ this.type = type;
+ this.records = records;
+ this.ddls = ddls;
+ this.updatedSchemas = updatedSchemas;
+ }
+
+ public static DeserializeResult dml(List<String> records) {
+ return new DeserializeResult(Type.DML, records, null, null);
+ }
+
+ public static DeserializeResult schemaChange(
+ List<String> ddls, Map<TableId, TableChanges.TableChange>
updatedSchemas) {
+ return new DeserializeResult(
+ Type.SCHEMA_CHANGE, Collections.emptyList(), ddls,
updatedSchemas);
+ }
+
+ /**
+ * Schema change result that also carries DML records from the triggering
record. The
+ * coordinator should execute DDLs first, then write the records.
+ */
+ public static DeserializeResult schemaChange(
+ List<String> ddls,
+ Map<TableId, TableChanges.TableChange> updatedSchemas,
+ List<String> records) {
+ return new DeserializeResult(Type.SCHEMA_CHANGE, records, ddls,
updatedSchemas);
+ }
+
+ public static DeserializeResult empty() {
+ return new DeserializeResult(Type.EMPTY, Collections.emptyList(),
null, null);
+ }
+
+ public Type getType() {
+ return type;
+ }
+
+ public List<String> getRecords() {
+ return records;
+ }
+
+ public List<String> getDdls() {
+ return ddls;
+ }
+
+ public Map<TableId, TableChanges.TableChange> getUpdatedSchemas() {
+ return updatedSchemas;
+ }
+}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/MySqlDebeziumJsonDeserializer.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/MySqlDebeziumJsonDeserializer.java
new file mode 100644
index 00000000000..b64c7186983
--- /dev/null
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/MySqlDebeziumJsonDeserializer.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.deserialize;
+
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+
+import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
+import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MySQL-specific deserializer that handles DDL schema change events.
+ *
+ * <p>When a schema change event is detected, it parses the HistoryRecord,
computes the diff against
+ * stored tableSchemas, generates Doris ALTER TABLE SQL, and returns a
SCHEMA_CHANGE result.
+ */
+public class MySqlDebeziumJsonDeserializer extends DebeziumJsonDeserializer {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LoggerFactory.getLogger(MySqlDebeziumJsonDeserializer.class);
+ private static final FlinkJsonTableChangeSerializer
TABLE_CHANGE_SERIALIZER =
+ new FlinkJsonTableChangeSerializer();
+
+ private String targetDb;
+
+ @Override
+ public void init(Map<String, String> props) {
+ super.init(props);
+ this.targetDb = props.get(DataSourceConfigKeys.DATABASE);
+ }
+
+ @Override
+ public DeserializeResult deserialize(Map<String, String> context,
SourceRecord record)
+ throws IOException {
+ if (RecordUtils.isSchemaChangeEvent(record)) {
+ return handleSchemaChangeEvent(record, context);
+ }
+ return super.deserialize(context, record);
+ }
+
+ private DeserializeResult handleSchemaChangeEvent(
+ SourceRecord record, Map<String, String> context) {
+ // todo: record has mysql ddl, need to convert doris ddl
+ return DeserializeResult.empty();
+ }
+}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
new file mode 100644
index 00000000000..2dc2310054b
--- /dev/null
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
@@ -0,0 +1,248 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.deserialize;
+
+import org.apache.doris.cdcclient.common.Constants;
+import org.apache.doris.cdcclient.utils.SchemaChangeHelper;
+
+import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
+import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
+
+import io.debezium.data.Envelope;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableEditor;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * PostgreSQL-specific deserializer that detects schema changes (ADD/DROP
column only) by comparing
+ * the record's Kafka Connect schema field names with stored tableSchemas.
+ *
+ * <p>Because PostgreSQL does not emit DDL events in the WAL stream, schema
detection is done by
+ * comparing the "after" struct field names in each DML record against the
known column set.
+ *
+ * <p>Type comparison is intentionally skipped to avoid false positives caused
by Kafka Connect type
+ * ambiguity (e.g. text/varchar/json/uuid all appear as STRING). When a column
add or drop is
+ * detected, the accurate column types are fetched directly from PostgreSQL
via the injected {@link
+ * #pgSchemaRefresher} callback.
+ *
+ * <p>MODIFY column type is not supported — users must manually execute ALTER
TABLE ... MODIFY
+ * COLUMN in Doris when a PG column type changes.
+ */
+public class PostgresDebeziumJsonDeserializer extends DebeziumJsonDeserializer
{
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PostgresDebeziumJsonDeserializer.class);
+
+ /**
+ * Callback to fetch the current PG table schema for a single table via
JDBC. Injected by {@link
+ * org.apache.doris.cdcclient.source.reader.postgres.PostgresSourceReader}
after initialization.
+ */
+ private transient Function<TableId, TableChanges.TableChange>
pgSchemaRefresher;
+
+ public void setPgSchemaRefresher(Function<TableId,
TableChanges.TableChange> refresher) {
+ this.pgSchemaRefresher = refresher;
+ }
+
+ @Override
+ public DeserializeResult deserialize(Map<String, String> context,
SourceRecord record)
+ throws IOException {
+ if (!RecordUtils.isDataChangeRecord(record)) {
+ return DeserializeResult.empty();
+ }
+
+ Schema valueSchema = record.valueSchema();
+ if (valueSchema == null) {
+ return super.deserialize(context, record);
+ }
+
+ Field afterField = valueSchema.field(Envelope.FieldName.AFTER);
+ if (afterField == null) {
+ return super.deserialize(context, record);
+ }
+
+ Schema afterSchema = afterField.schema();
+ TableId tableId = extractTableId(record);
+ TableChanges.TableChange stored = tableSchemas != null ?
tableSchemas.get(tableId) : null;
+
+ // No baseline schema available — cannot detect changes, fall through
to normal
+ // deserialization
+ if (stored == null || stored.getTable() == null) {
+ LOG.debug(
+ "No stored schema for table {}, skipping schema change
detection.",
+ tableId.identifier());
+ return super.deserialize(context, record);
+ }
+
+ // First pass: name-only diff — fast, in-memory, no type comparison,
no false positives
+ SchemaChangeHelper.SchemaDiff nameDiff =
+ SchemaChangeHelper.diffSchemaByName(afterSchema, stored);
+ if (nameDiff.isEmpty()) {
+ return super.deserialize(context, record);
+ }
+
+ Preconditions.checkNotNull(
+ pgSchemaRefresher,
+ "pgSchemaRefresher callback is not set. Cannot fetch fresh PG
schema for change detection.");
+
+ // the last fresh schema
+ TableChanges.TableChange fresh = pgSchemaRefresher.apply(tableId);
+ if (fresh == null || fresh.getTable() == null) {
+ // Cannot proceed: DDL must be executed before the triggering DML
record is written,
+ // otherwise new column data in this record would be silently
dropped.
+ // Throwing here causes the batch to be retried from the same
offset.
+ throw new IOException(
+ "Failed to fetch fresh schema for table "
+ + tableId.identifier()
+ + "; cannot apply schema change safely. Will
retry.");
+ }
+
+ // Second diff: use afterSchema as the source of truth for which
columns the current WAL
+ // record is aware of. Only process additions/drops visible in
afterSchema — columns that
+ // exist in fresh (JDBC) but are absent from afterSchema belong to a
later DDL that has not
+ // yet produced a DML record, and will be processed when that DML
record arrives.
+ //
+ // pgAdded: present in afterSchema but absent from stored → look up
Column in fresh for
+ // accurate PG type metadata. If fresh doesn't have the
column yet (shouldn't
+ // happen normally), skip it.
+ // pgDropped: present in stored but absent from afterSchema.
+ List<Column> pgAdded = new ArrayList<>();
+ List<String> pgDropped = new ArrayList<>();
+
+ for (Field field : afterSchema.fields()) {
+ if (stored.getTable().columnWithName(field.name()) == null) {
+ Column freshCol =
fresh.getTable().columnWithName(field.name());
+ if (freshCol != null) {
+ pgAdded.add(freshCol);
+ }
+ }
+ }
+
+ for (Column col : stored.getTable().columns()) {
+ if (afterSchema.field(col.name()) == null) {
+ pgDropped.add(col.name());
+ }
+ }
+
+ // Second diff is empty: nameDiff was a false positive (PG schema
unchanged vs stored).
+ // This happens when pgSchemaRefresher returns a schema ahead of the
current WAL position
+ // (e.g. a later DDL was already applied in PG while we're still
consuming older DML
+ // records).
+ // No DDL needed, no tableSchema update, no extra stream load — just
process the DML
+ // normally.
+ if (pgAdded.isEmpty() && pgDropped.isEmpty()) {
+ return super.deserialize(context, record);
+ }
+
+ // Build updatedSchemas from fresh filtered to afterSchema columns
only, so that the stored
+ // cache does not jump ahead to include columns not yet seen by any
DML record. Those
+ // unseen columns will trigger their own schema change when their
first DML record arrives.
+ TableEditor editor = fresh.getTable().edit();
+ for (Column col : fresh.getTable().columns()) {
+ if (afterSchema.field(col.name()) == null) {
+ editor.removeColumn(col.name());
+ }
+ }
+ TableChanges.TableChange filteredChange =
+ new
TableChanges.TableChange(TableChanges.TableChangeType.ALTER, editor.create());
+ Map<TableId, TableChanges.TableChange> updatedSchemas = new
HashMap<>();
+ updatedSchemas.put(tableId, filteredChange);
+
+ // Rename guard: simultaneous ADD+DROP may be a column RENAME — skip
DDL to avoid data loss.
+ // Users must manually RENAME the column in Doris.
+ if (!pgAdded.isEmpty() && !pgDropped.isEmpty()) {
+ LOG.warn(
+ "[SCHEMA-CHANGE-SKIPPED] Table: {}\n"
+ + "Potential RENAME detected (simultaneous
DROP+ADD).\n"
+ + "Dropped columns: {}\n"
+ + "Added columns: {}\n"
+ + "No DDL emitted to prevent data loss.\n"
+ + "Action required: manually RENAME column(s) in
Doris,"
+ + " then data will resume.",
+ tableId.identifier(),
+ pgDropped,
+
pgAdded.stream().map(Column::name).collect(Collectors.toList()));
+ List<String> dmlRecords = super.deserialize(context,
record).getRecords();
+ return DeserializeResult.schemaChange(
+ Collections.emptyList(), updatedSchemas, dmlRecords);
+ }
+
+ // Generate DDLs using accurate PG column types
+ String db = context.get(Constants.DORIS_TARGET_DB);
+ List<String> ddls = new ArrayList<>();
+
+ for (String colName : pgDropped) {
+ ddls.add(SchemaChangeHelper.buildDropColumnSql(db,
tableId.table(), colName));
+ }
+
+ for (Column col : pgAdded) {
+ String colType = SchemaChangeHelper.columnToDorisType(col);
+ String nullable = col.isOptional() ? "" : " NOT NULL";
+ // pgAdded only contains columns present in afterSchema, so field
lookup is safe.
+ // afterSchema.defaultValue() returns an already-deserialized Java
object
+ // (e.g. String "hello", Integer 42) — no PG SQL cast suffix to
strip.
+ // PG WAL DML records do not carry column comment metadata.
+ Object defaultObj =
afterSchema.field(col.name()).schema().defaultValue();
+ ddls.add(
+ SchemaChangeHelper.buildAddColumnSql(
+ db,
+ tableId.table(),
+ col.name(),
+ colType + nullable,
+ defaultObj != null ? String.valueOf(defaultObj) :
null,
+ null));
+ }
+
+ List<String> dmlRecords = super.deserialize(context,
record).getRecords();
+
+ LOG.info(
+ "Postgres schema change detected for table {}: added={},
dropped={}. DDLs: {}",
+ tableId.identifier(),
+
pgAdded.stream().map(Column::name).collect(Collectors.toList()),
+ pgDropped,
+ ddls);
+
+ return DeserializeResult.schemaChange(ddls, updatedSchemas,
dmlRecords);
+ }
+
+ private TableId extractTableId(SourceRecord record) {
+ Struct value = (Struct) record.value();
+ Struct source = value.getStruct(Envelope.FieldName.SOURCE);
+ String schemaName = source.getString(SCHEMA_NAME_KEY);
+ String tableName = source.getString(TABLE_NAME_KEY);
+ return new TableId(null, schemaName, tableName);
+ }
+}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/SourceRecordDeserializer.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/SourceRecordDeserializer.java
index f93567a230a..cc0a519da07 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/SourceRecordDeserializer.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/SourceRecordDeserializer.java
@@ -21,8 +21,13 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+
public interface SourceRecordDeserializer<T, C> extends Serializable {
void init(Map<String, String> props);
C deserialize(Map<String, String> context, T record) throws IOException;
+
+ default void setTableSchemas(Map<TableId, TableChanges.TableChange>
tableSchemas) {}
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
new file mode 100644
index 00000000000..6ebf75a99aa
--- /dev/null
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.reader;
+
+import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
+import org.apache.doris.cdcclient.source.deserialize.SourceRecordDeserializer;
+import org.apache.doris.cdcclient.utils.SchemaChangeManager;
+import org.apache.doris.job.cdc.request.JobBaseRecordRequest;
+
+import org.apache.flink.cdc.connectors.base.utils.SerializerUtils;
+import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.debezium.document.Document;
+import io.debezium.document.DocumentReader;
+import io.debezium.document.DocumentWriter;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import lombok.Getter;
+import lombok.Setter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base class providing common schema-tracking functionality for CDC
source readers.
+ *
+ * <p>Handles serialization/deserialization of {@code tableSchemas} between FE
and cdc_client, and
+ * provides a helper to load schemas from the incoming {@link
JobBaseRecordRequest}.
+ */
+@Getter
+@Setter
+public abstract class AbstractCdcSourceReader implements SourceReader {
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractCdcSourceReader.class);
+
+ protected static final FlinkJsonTableChangeSerializer
TABLE_CHANGE_SERIALIZER =
+ new FlinkJsonTableChangeSerializer();
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ protected SourceRecordDeserializer<SourceRecord, DeserializeResult>
serializer;
+ protected Map<TableId, TableChanges.TableChange> tableSchemas;
+
+ /**
+ * Load tableSchemas from a JSON string (produced by {@link
#serializeTableSchemas()}). Used
+ * when a binlog/stream split is resumed from FE-persisted state.
+ *
+ * <p>Format: {@code
[{"i":"\"schema\".\"table\"","uc":false,"c":{...debeziumDoc...}},...]}.
+ */
+ public void loadTableSchemasFromJson(String json) throws IOException {
+ if (json == null || json.isEmpty()) {
+ return;
+ }
+ JsonNode root = OBJECT_MAPPER.readTree(json);
+ Map<TableId, TableChanges.TableChange> map = new ConcurrentHashMap<>();
+ DocumentReader docReader = DocumentReader.defaultReader();
+ for (JsonNode entry : root) {
+ boolean uc = entry.path("uc").asBoolean(false);
+ TableId tableId = TableId.parse(entry.get("i").asText(), uc);
+ Document doc =
docReader.read(OBJECT_MAPPER.writeValueAsString(entry.get("c")));
+ TableChanges.TableChange change =
FlinkJsonTableChangeSerializer.fromDocument(doc, uc);
+ map.put(tableId, change);
+ }
+ this.tableSchemas = map;
+ this.serializer.setTableSchemas(map);
+ LOG.info("Loaded {} table schemas from JSON", map.size());
+ }
+
+ /**
+ * Serialize current tableSchemas to a compact JSON string for FE
persistence.
+ *
+ * <p>Stores the Debezium document as a nested JSON object (not a string)
to avoid redundant
+ * escaping. Format: {@code
+ * [{"i":"\"schema\".\"table\"","uc":false,"c":{...debeziumDoc...}},...]}.
+ */
+ @Override
+ public String serializeTableSchemas() {
+ if (tableSchemas == null || tableSchemas.isEmpty()) {
+ return null;
+ }
+ try {
+ DocumentWriter docWriter = DocumentWriter.defaultWriter();
+ ArrayNode result = OBJECT_MAPPER.createArrayNode();
+ for (Map.Entry<TableId, TableChanges.TableChange> e :
tableSchemas.entrySet()) {
+ TableId tableId = e.getKey();
+ // useCatalogBeforeSchema: false when catalog is null but
schema is set (e.g. PG)
+ boolean uc =
SerializerUtils.shouldUseCatalogBeforeSchema(tableId);
+ ObjectNode entry = OBJECT_MAPPER.createObjectNode();
+ entry.put("i", tableId.toDoubleQuotedString());
+ entry.put("uc", uc);
+ // parse compact doc JSON into a JsonNode so "c" is a nested
object, not a string
+ entry.set(
+ "c",
+ OBJECT_MAPPER.readTree(
+
docWriter.write(TABLE_CHANGE_SERIALIZER.toDocument(e.getValue()))));
+ result.add(entry);
+ }
+ return OBJECT_MAPPER.writeValueAsString(result);
+ } catch (Exception e) {
+ // Return null so the current batch is not failed — data keeps
flowing and
+ // schema persistence will be retried on the next DDL or
feHadNoSchema batch.
+ // For PostgreSQL this is safe: WAL records carry afterSchema so
the next DML
+ // will re-trigger schema-change detection and self-heal.
+ // WARNING: for MySQL (schema change not yet implemented),
returning null here
+ // is dangerous — MySQL binlog has no inline schema, so loading a
stale
+ // pre-DDL schema from FE on the next task would cause column
mismatches
+ // (flink-cdc#732). When MySQL schema change is implemented, this
must throw
+ // instead of returning null to prevent committing the offset with
a stale schema.
+ LOG.error(
+ "Failed to serialize tableSchemas, schema will not be
persisted to FE"
+ + " in this cycle. Will retry on next DDL or
batch.",
+ e);
+ return null;
+ }
+ }
+
+ /** Apply schema changes to in-memory tableSchemas and notify the
serializer. */
+ @Override
+ public void applySchemaChange(Map<TableId, TableChanges.TableChange>
updatedSchemas) {
+ if (updatedSchemas == null || updatedSchemas.isEmpty()) {
+ return;
+ }
+ if (tableSchemas == null) {
+ tableSchemas = new ConcurrentHashMap<>(updatedSchemas);
+ } else {
+ tableSchemas.putAll(updatedSchemas);
+ }
+ serializer.setTableSchemas(tableSchemas);
+ }
+
+ /**
+ * Load FE-persisted tableSchemas into memory from the incoming request.
+ *
+ * <p>FE's schema and offset are always committed together, so FE's schema
always corresponds to
+ * the starting offset of the current batch. Loading it unconditionally
ensures the deserializer
+ * uses the correct baseline — particularly critical for MySQL: Flink CDC
only retains the
+ * latest schema in memory, so if a previous batch executed a DDL but
failed to commit the
+ * offset, retrying from the pre-DDL offset with a stale post-DDL cache
would cause
+ * schema-mismatch errors on every retry (see flink-cdc#732). PostgreSQL
is unaffected by this
+ * because WAL records carry the schema at the time they were written, but
loading FE's schema
+ * unconditionally is still correct: any re-detected DDL will be handled
idempotently by {@link
+ * SchemaChangeManager}.
+ *
+ * <p>Call this at the start of preparing a binlog/stream split.
+ */
+ protected void tryLoadTableSchemasFromRequest(JobBaseRecordRequest
baseReq) throws IOException {
+ loadTableSchemasFromJson(baseReq.getTableSchemas());
+ }
+}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
index 36111d0fbf4..5b8e343faae 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
@@ -18,7 +18,7 @@
package org.apache.doris.cdcclient.source.reader;
import org.apache.doris.cdcclient.source.deserialize.DebeziumJsonDeserializer;
-import org.apache.doris.cdcclient.source.deserialize.SourceRecordDeserializer;
+import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
import org.apache.doris.cdcclient.source.factory.DataSource;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
import org.apache.doris.job.cdc.request.FetchTableSplitsRequest;
@@ -83,11 +83,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Data
-public abstract class JdbcIncrementalSourceReader implements SourceReader {
+public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReader {
private static final Logger LOG =
LoggerFactory.getLogger(JdbcIncrementalSourceReader.class);
private static ObjectMapper objectMapper = new ObjectMapper();
- private SourceRecordDeserializer<SourceRecord, List<String>> serializer;
- private Map<TableId, TableChanges.TableChange> tableSchemas;
// Support for multiple snapshot splits
private List<
@@ -334,6 +332,22 @@ public abstract class JdbcIncrementalSourceReader
implements SourceReader {
/** Prepare stream split */
private SplitReadResult prepareStreamSplit(
Map<String, Object> offsetMeta, JobBaseRecordRequest baseReq)
throws Exception {
+ // Load tableSchemas from FE if available (avoids re-discover on
restart)
+ tryLoadTableSchemasFromRequest(baseReq);
+ // If still null (incremental-only startup, or snapshot→binlog
transition where FE never
+ // persisted schema), do a JDBC discover so the deserializer has a
baseline to diff against.
+ if (this.tableSchemas == null) {
+ LOG.info(
+ "No tableSchemas available for stream split, discovering
via JDBC for job {}",
+ baseReq.getJobId());
+ Map<TableId, TableChanges.TableChange> discovered =
getTableSchemas(baseReq);
+ this.tableSchemas = new
java.util.concurrent.ConcurrentHashMap<>(discovered);
+ this.serializer.setTableSchemas(this.tableSchemas);
+ LOG.info(
+ "Discovered {} table schema(s) for job {}",
+ discovered.size(),
+ baseReq.getJobId());
+ }
Tuple2<SourceSplitBase, Boolean> splitFlag =
createStreamSplit(offsetMeta, baseReq);
this.streamSplit = splitFlag.f0.asStreamSplit();
this.streamReader = getBinlogSplitReader(baseReq);
@@ -908,7 +922,7 @@ public abstract class JdbcIncrementalSourceReader
implements SourceReader {
}
@Override
- public List<String> deserialize(Map<String, String> config, SourceRecord
element)
+ public DeserializeResult deserialize(Map<String, String> config,
SourceRecord element)
throws IOException {
return serializer.deserialize(config, element);
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
index 6c1a018dde3..fa4578d509b 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
@@ -17,6 +17,7 @@
package org.apache.doris.cdcclient.source.reader;
+import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
import org.apache.doris.cdcclient.source.factory.DataSource;
import org.apache.doris.job.cdc.request.CompareOffsetRequest;
import org.apache.doris.job.cdc.request.FetchTableSplitsRequest;
@@ -32,6 +33,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+
/** Source Reader Interface */
public interface SourceReader {
String SPLIT_ID = "splitId";
@@ -75,7 +79,19 @@ public interface SourceReader {
/** Called when closing */
void close(JobBaseConfig jobConfig);
- List<String> deserialize(Map<String, String> config, SourceRecord element)
throws IOException;
+ DeserializeResult deserialize(Map<String, String> config, SourceRecord
element)
+ throws IOException;
+
+ /**
+ * Apply schema changes to the in-memory tableSchemas. Called after schema
change is executed on
+ * Doris.
+ */
+ default void applySchemaChange(Map<TableId, TableChanges.TableChange>
updatedSchemas) {}
+
+ /** Serialize current tableSchemas to JSON for persistence via
commitOffset. */
+ default String serializeTableSchemas() {
+ return null;
+ }
/**
* Commits the given offset with the source database. Used by some source
like Postgres to
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index 83c9b349e4e..11e5007894d 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -17,11 +17,11 @@
package org.apache.doris.cdcclient.source.reader.mysql;
-import org.apache.doris.cdcclient.source.deserialize.DebeziumJsonDeserializer;
-import org.apache.doris.cdcclient.source.deserialize.SourceRecordDeserializer;
+import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
+import
org.apache.doris.cdcclient.source.deserialize.MySqlDebeziumJsonDeserializer;
import org.apache.doris.cdcclient.source.factory.DataSource;
+import org.apache.doris.cdcclient.source.reader.AbstractCdcSourceReader;
import org.apache.doris.cdcclient.source.reader.SnapshotReaderContext;
-import org.apache.doris.cdcclient.source.reader.SourceReader;
import org.apache.doris.cdcclient.source.reader.SplitReadResult;
import org.apache.doris.cdcclient.source.reader.SplitRecords;
import org.apache.doris.cdcclient.utils.ConfigUtil;
@@ -62,7 +62,6 @@ import
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.TableDiscoveryUtils;
import org.apache.flink.cdc.connectors.mysql.table.StartupMode;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
-import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
import org.apache.flink.table.types.logical.RowType;
import org.apache.kafka.connect.source.SourceRecord;
@@ -110,13 +109,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Data
-public class MySqlSourceReader implements SourceReader {
+public class MySqlSourceReader extends AbstractCdcSourceReader {
private static final Logger LOG =
LoggerFactory.getLogger(MySqlSourceReader.class);
private static ObjectMapper objectMapper = new ObjectMapper();
- private static final FlinkJsonTableChangeSerializer
TABLE_CHANGE_SERIALIZER =
- new FlinkJsonTableChangeSerializer();
- private SourceRecordDeserializer<SourceRecord, List<String>> serializer;
- private Map<TableId, TableChanges.TableChange> tableSchemas;
// Support for multiple snapshot splits with Round-Robin polling
private List<
@@ -135,7 +130,7 @@ public class MySqlSourceReader implements SourceReader {
private MySqlBinlogSplitState binlogSplitState;
public MySqlSourceReader() {
- this.serializer = new DebeziumJsonDeserializer();
+ this.serializer = new MySqlDebeziumJsonDeserializer();
this.snapshotReaderContexts = new ArrayList<>();
}
@@ -339,6 +334,8 @@ public class MySqlSourceReader implements SourceReader {
/** Prepare binlog split */
private SplitReadResult prepareBinlogSplit(
Map<String, Object> offsetMeta, JobBaseRecordRequest baseReq)
throws Exception {
+ // Load tableSchemas from FE if available (avoids re-discover on
restart)
+ tryLoadTableSchemasFromRequest(baseReq);
Tuple2<MySqlSplit, Boolean> splitFlag = createBinlogSplit(offsetMeta,
baseReq);
this.binlogSplit = (MySqlBinlogSplit) splitFlag.f0;
this.binlogReader = getBinlogSplitReader(baseReq);
@@ -778,6 +775,8 @@ public class MySqlSourceReader implements SourceReader {
configFactory.serverTimeZone(
ConfigUtil.getTimeZoneFromProps(cu.getOriginalProperties()).toString());
+ // Schema change handling for MySQL is not yet implemented; keep
disabled to avoid
+ // unnecessary processing overhead until DDL support is added.
configFactory.includeSchemaChanges(false);
String includingTables =
cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES);
@@ -992,7 +991,7 @@ public class MySqlSourceReader implements SourceReader {
}
@Override
- public List<String> deserialize(Map<String, String> config, SourceRecord
element)
+ public DeserializeResult deserialize(Map<String, String> config,
SourceRecord element)
throws IOException {
return serializer.deserialize(config, element);
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index a0bb57ad120..737e36045d9 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -19,6 +19,7 @@ package org.apache.doris.cdcclient.source.reader.postgres;
import org.apache.doris.cdcclient.common.Constants;
import org.apache.doris.cdcclient.exception.CdcClientException;
+import
org.apache.doris.cdcclient.source.deserialize.PostgresDebeziumJsonDeserializer;
import org.apache.doris.cdcclient.source.factory.DataSource;
import org.apache.doris.cdcclient.source.reader.JdbcIncrementalSourceReader;
import org.apache.doris.cdcclient.utils.ConfigUtil;
@@ -55,6 +56,7 @@ import org.apache.flink.table.types.DataType;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -84,6 +86,7 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
public PostgresSourceReader() {
super();
+ this.setSerializer(new PostgresDebeziumJsonDeserializer());
}
@Override
@@ -95,6 +98,12 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
createSlotForGlobalStreamSplit(dialect);
}
super.initialize(jobId, dataSource, config);
+ // Inject PG schema refresher so the deserializer can fetch accurate
column types on DDL
+ if (serializer instanceof PostgresDebeziumJsonDeserializer) {
+ ((PostgresDebeziumJsonDeserializer) serializer)
+ .setPgSchemaRefresher(
+ tableId -> refreshSingleTableSchema(tableId,
config, jobId));
+ }
}
/**
@@ -359,6 +368,29 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
}
}
+ /**
+ * Fetch the current schema for a single table directly from PostgreSQL
via JDBC.
+ *
+ * <p>Called by {@link PostgresDebeziumJsonDeserializer} when a schema
change (ADD/DROP column)
+ * is detected, to obtain accurate PG column types for DDL generation.
+ *
+ * @return the fresh {@link TableChanges.TableChange}
+ */
+ private TableChanges.TableChange refreshSingleTableSchema(
+ TableId tableId, Map<String, String> config, long jobId) {
+ PostgresSourceConfig sourceConfig = generatePostgresConfig(config,
jobId, 0);
+ PostgresDialect dialect = new PostgresDialect(sourceConfig);
+ try (JdbcConnection jdbcConnection =
dialect.openJdbcConnection(sourceConfig)) {
+ CustomPostgresSchema customPostgresSchema =
+ new CustomPostgresSchema((PostgresConnection)
jdbcConnection, sourceConfig);
+ Map<TableId, TableChanges.TableChange> schemas =
+
customPostgresSchema.getTableSchema(Collections.singletonList(tableId));
+ return schemas.get(tableId);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Override
protected FetchTask<SourceSplitBase> createFetchTaskFromSplit(
JobBaseConfig jobConfig, SourceSplitBase split) {
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java
index 4d1356003fb..05407b2c89d 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java
@@ -52,4 +52,8 @@ public class HttpUtil {
.addInterceptorLast(new RequestContent(true))
.build();
}
+
+ public static String getAuthHeader() {
+ return "Basic YWRtaW46";
+ }
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java
new file mode 100644
index 00000000000..5eea4f1f16f
--- /dev/null
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.doris.cdcclient.common.DorisType;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.debezium.relational.Column;
+import io.debezium.relational.history.TableChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utility class for generating Doris ALTER TABLE SQL from schema diffs. */
+public class SchemaChangeHelper {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SchemaChangeHelper.class);
+ private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
+ private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s";
+
+ private SchemaChangeHelper() {}
+
+ // ─── Schema diff result
────────────────────────────────────────────────────
+
+ /**
+ * Holds the result of a full schema comparison between an after-schema
and stored TableChange.
+ */
+ public static class SchemaDiff {
+ /** Fields present in afterSchema but absent from stored. */
+ public final List<Field> added;
+
+ /** Column names present in stored but absent from afterSchema. */
+ public final List<String> dropped;
+
+ /** Same-named columns whose Doris type or default value differs. */
+ public final Map<String, Field> modified;
+
+ public SchemaDiff(List<Field> added, List<String> dropped, Map<String,
Field> modified) {
+ this.added = added;
+ this.dropped = dropped;
+ this.modified = modified;
+ }
+
+ public boolean isEmpty() {
+ return added.isEmpty() && dropped.isEmpty() && modified.isEmpty();
+ }
+ }
+
+ // ─── Schema-diff helpers (Kafka Connect schema ↔ stored TableChange)
──────
+
+ /**
+ * Name-only schema diff: compare field names in {@code afterSchema}
against the stored {@link
+ * TableChanges.TableChange}, detecting added and dropped columns by name
only.
+ *
+ * <p>Only support add and drop and not support modify and rename
+ *
+ * <p>When {@code stored} is null or empty, both lists are empty (no
baseline to diff against).
+ */
+ public static SchemaDiff diffSchemaByName(Schema afterSchema,
TableChanges.TableChange stored) {
+ List<Field> added = new ArrayList<>();
+ List<String> dropped = new ArrayList<>();
+
+ if (afterSchema == null || stored == null || stored.getTable() ==
null) {
+ return new SchemaDiff(added, dropped, new LinkedHashMap<>());
+ }
+
+ // Detect added: fields present in afterSchema but absent from stored
+ for (Field field : afterSchema.fields()) {
+ if (stored.getTable().columnWithName(field.name()) == null) {
+ added.add(field);
+ }
+ }
+
+ // Detect dropped: columns present in stored but absent from
afterSchema
+ for (Column col : stored.getTable().columns()) {
+ if (afterSchema.field(col.name()) == null) {
+ dropped.add(col.name());
+ }
+ }
+
+ return new SchemaDiff(added, dropped, new LinkedHashMap<>());
+ }
+
+ // ─── Quoting helpers
──────────────────────────────────────────────────────
+
+ /** Wrap a name in backticks if not already quoted. */
+ public static String identifier(String name) {
+ if (name.startsWith("`") && name.endsWith("`")) {
+ return name;
+ }
+ return "`" + name + "`";
+ }
+
+ /** Return a fully-qualified {@code `db`.`table`} identifier string. */
+ public static String quoteTableIdentifier(String db, String table) {
+ return identifier(db) + "." + identifier(table);
+ }
+
+ /**
+ * Format a default value (already a plain Java string, not a raw SQL
expression) into a form
+ * suitable for a Doris {@code DEFAULT} clause.
+ *
+ * <p>The caller is expected to pass a <em>deserialized</em> value — e.g.
obtained from the
+ * Kafka Connect schema via {@code
field.schema().defaultValue().toString()} — rather than a raw
+ * PG SQL expression. This avoids having to strip PG-specific type casts
({@code ::text}, etc.).
+ *
+ * <ul>
+ * <li>SQL keywords ({@code NULL}, {@code CURRENT_TIMESTAMP}, {@code
TRUE}, {@code FALSE}) are
+ * returned as-is.
+ * <li>Numeric literals are returned as-is (no quotes).
+ * <li>Everything else is wrapped in single quotes.
+ * </ul>
+ */
+ public static String quoteDefaultValue(String defaultValue) {
+ if (defaultValue == null) {
+ return null;
+ }
+ if (defaultValue.equalsIgnoreCase("current_timestamp")
+ || defaultValue.equalsIgnoreCase("null")
+ || defaultValue.equalsIgnoreCase("true")
+ || defaultValue.equalsIgnoreCase("false")) {
+ return defaultValue;
+ }
+ try {
+ Double.parseDouble(defaultValue);
+ return defaultValue;
+ } catch (NumberFormatException ignored) {
+ // fall through
+ }
+ return "'" + defaultValue.replace("'", "''") + "'";
+ }
+
+ /** Escape single quotes inside a COMMENT string. */
+ public static String quoteComment(String comment) {
+ if (comment == null) {
+ return "";
+ }
+ return comment.replace("'", "''");
+ }
+
+ // ─── DDL builders
─────────────────────────────────────────────────────────
+
+ /**
+ * Build {@code ALTER TABLE ... ADD COLUMN} SQL.
+ *
+ * @param db target database
+ * @param table target table
+ * @param colName column name
+ * @param colType Doris column type string (including optional NOT NULL)
+ * @param defaultValue optional DEFAULT value; {@code null} = omit DEFAULT
clause
+ * @param comment optional COMMENT; {@code null}/empty = omit COMMENT
clause
+ */
+ public static String buildAddColumnSql(
+ String db,
+ String table,
+ String colName,
+ String colType,
+ String defaultValue,
+ String comment) {
+ StringBuilder sb =
+ new StringBuilder(
+ String.format(
+ ADD_DDL,
+ quoteTableIdentifier(db, table),
+ identifier(colName),
+ colType));
+ if (defaultValue != null) {
+ sb.append(" DEFAULT ").append(quoteDefaultValue(defaultValue));
+ }
+ appendComment(sb, comment);
+ return sb.toString();
+ }
+
+ /** Build {@code ALTER TABLE ... DROP COLUMN} SQL. */
+ public static String buildDropColumnSql(String db, String table, String
colName) {
+ return String.format(DROP_DDL, quoteTableIdentifier(db, table),
identifier(colName));
+ }
+
+ // ─── Type mapping
─────────────────────────────────────────────────────────
+
+ /** Convert a Debezium Column to a Doris column type string (via PG type
name). */
+ public static String columnToDorisType(Column column) {
+ return pgTypeNameToDorisType(column.typeName(), column.length(),
column.scale().orElse(-1));
+ }
+
+ /** Map a PostgreSQL native type name to a Doris type string. */
+ static String pgTypeNameToDorisType(String pgTypeName, int length, int
scale) {
+ Preconditions.checkNotNull(pgTypeName);
+ switch (pgTypeName.toLowerCase()) {
+ case "bool":
+ return DorisType.BOOLEAN;
+ case "bit":
+ return length == 1 ? DorisType.BOOLEAN : DorisType.STRING;
+ case "int2":
+ case "smallserial":
+ return DorisType.SMALLINT;
+ case "int4":
+ case "serial":
+ return DorisType.INT;
+ case "int8":
+ case "bigserial":
+ return DorisType.BIGINT;
+ case "float4":
+ return DorisType.FLOAT;
+ case "float8":
+ return DorisType.DOUBLE;
+ case "numeric":
+ {
+ int p = length > 0 ? Math.min(length, 38) : 38;
+ int s = scale >= 0 ? scale : 9;
+ return String.format("%s(%d, %d)", DorisType.DECIMAL, p,
s);
+ }
+ case "bpchar":
+ {
+ if (length <= 0) {
+ return DorisType.STRING;
+ }
+ int len = length * 3;
+ if (len > 255) {
+ return String.format("%s(%s)", DorisType.VARCHAR, len);
+ } else {
+ return String.format("%s(%s)", DorisType.CHAR, len);
+ }
+ }
+ case "date":
+ return DorisType.DATE;
+ case "timestamp":
+ case "timestamptz":
+ {
+ int s = (scale >= 0 && scale <= 6) ? scale : 6;
+ return String.format("%s(%d)", DorisType.DATETIME, s);
+ }
+ // All remaining types map to STRING (aligned with
JdbcPostgreSQLClient)
+ case "point":
+ case "line":
+ case "lseg":
+ case "box":
+ case "path":
+ case "polygon":
+ case "circle":
+ case "varchar":
+ case "text":
+ case "time":
+ case "timetz":
+ case "interval":
+ case "cidr":
+ case "inet":
+ case "macaddr":
+ case "varbit":
+ case "uuid":
+ case "bytea":
+ return DorisType.STRING;
+ case "json":
+ case "jsonb":
+ return DorisType.JSON;
+ default:
+ LOG.warn("Unrecognized PostgreSQL type '{}', defaulting to
STRING", pgTypeName);
+ return DorisType.STRING;
+ }
+ }
+
+ // ─── Internal helpers
─────────────────────────────────────────────────────
+
+ private static void appendComment(StringBuilder sb, String comment) {
+ if (comment != null && !comment.isEmpty()) {
+ sb.append(" COMMENT '").append(quoteComment(comment)).append("'");
+ }
+ }
+}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeManager.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeManager.java
new file mode 100644
index 00000000000..b392df9cfcd
--- /dev/null
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeManager.java
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Static utility class for executing DDL schema changes on the Doris FE via
HTTP. */
+public class SchemaChangeManager {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SchemaChangeManager.class);
+ private static final String SCHEMA_CHANGE_API =
"http://%s/api/query/default_cluster/%s";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final String COLUMN_EXISTS_MSG = "Can not add column which
already exists";
+ private static final String COLUMN_NOT_EXISTS_MSG = "Column does not
exists";
+
+ private SchemaChangeManager() {}
+
+ /**
+ * Execute a list of DDL statements on FE. Each statement is sent
independently.
+ *
+ * <p>Idempotent errors (ADD COLUMN when column already exists, DROP
COLUMN when column does not
+ * exist) are logged as warnings and silently skipped, so retries on a
different BE after a
+ * failed commitOffset do not cause infinite failures.
+ *
+ * @param feAddr Doris FE address (host:port)
+ * @param db target database
+ * @param token FE auth token
+ * @param sqls DDL statements to execute
+ */
+ public static void executeDdls(String feAddr, String db, String token,
List<String> sqls)
+ throws IOException {
+ if (sqls == null || sqls.isEmpty()) {
+ LOG.info("No DDL statements to execute");
+ return;
+ }
+ for (String stmt : sqls) {
+ stmt = stmt.trim();
+ if (stmt.isEmpty()) {
+ continue;
+ }
+ LOG.info("Executing DDL on FE {}: {}", feAddr, stmt);
+ execute(feAddr, db, token, stmt);
+ }
+ }
+
+ /**
+ * Execute a single SQL statement via the FE query API.
+ *
+ * <p>Idempotent errors are swallowed with a warning; all other errors
throw {@link
+ * IOException}.
+ */
+ public static void execute(String feAddr, String db, String token, String
sql)
+ throws IOException {
+ HttpPost post = buildHttpPost(feAddr, db, token, sql);
+ String responseBody = handleResponse(post);
+ LOG.info("Executed DDL {} with response: {}", sql, responseBody);
+ parseResponse(sql, responseBody);
+ }
+
+ // ─── Internal helpers
─────────────────────────────────────────────────────
+
+ private static HttpPost buildHttpPost(String feAddr, String db, String
token, String sql)
+ throws IOException {
+ String url = String.format(SCHEMA_CHANGE_API, feAddr, db);
+ Map<String, Object> bodyMap = new HashMap<>();
+ bodyMap.put("stmt", sql);
+ String body = OBJECT_MAPPER.writeValueAsString(bodyMap);
+
+ HttpPost post = new HttpPost(url);
+ post.setHeader("Content-Type", "application/json;charset=UTF-8");
+ post.setHeader("Authorization", HttpUtil.getAuthHeader());
+ post.setHeader("token", token);
+ post.setEntity(new StringEntity(body, "UTF-8"));
+ return post;
+ }
+
+ private static String handleResponse(HttpPost request) throws IOException {
+ try (CloseableHttpClient client = HttpUtil.getHttpClient();
+ CloseableHttpResponse response = client.execute(request)) {
+ String responseBody =
+ response.getEntity() != null ?
EntityUtils.toString(response.getEntity()) : "";
+ LOG.debug("HTTP [{}]: {}", request.getURI(), responseBody);
+ return responseBody;
+ }
+ }
+
+ /**
+ * Parse the FE response. Idempotent errors are logged as warnings and
skipped; all other errors
+ * throw.
+ *
+ * <p>Idempotent conditions (can occur when a previous commitOffset failed
and a fresh BE
+ * re-detects and re-executes the same DDL):
+ *
+ * <ul>
+ * <li>ADD COLUMN — "Can not add column which already exists": column
was already added.
+ * <li>DROP COLUMN — "Column does not exists": column was already
dropped.
+ * </ul>
+ */
+ private static void parseResponse(String sql, String responseBody) throws
IOException {
+ JsonNode root = OBJECT_MAPPER.readTree(responseBody);
+ JsonNode code = root.get("code");
+ if (code != null && code.asInt() == 0) {
+ return;
+ }
+
+ String msg = root.path("msg").asText("");
+
+ if (msg.contains(COLUMN_EXISTS_MSG)) {
+ LOG.warn("[DDL-IDEMPOTENT] Skipped ADD COLUMN (column already
exists). SQL: {}", sql);
+ return;
+ }
+ if (msg.contains(COLUMN_NOT_EXISTS_MSG)) {
+ LOG.warn("[DDL-IDEMPOTENT] Skipped DROP COLUMN (column already
absent). SQL: {}", sql);
+ return;
+ }
+
+ LOG.warn("DDL execution failed. SQL: {}. Response: {}", sql,
responseBody);
+ throw new IOException("Failed to execute schema change: " +
responseBody);
+ }
+}
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java
new file mode 100644
index 00000000000..b71fe609d4a
--- /dev/null
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.doris.cdcclient.common.DorisType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Unit tests for {@link SchemaChangeHelper#pgTypeNameToDorisType}. */
+class SchemaChangeHelperTest {
+
+ // ─── Integer types
────────────────────────────────────────────────────────
+
+ @Test
+ void integerTypes() {
+ assertEquals(DorisType.SMALLINT, map("int2", -1, -1));
+ assertEquals(DorisType.SMALLINT, map("smallserial", -1, -1));
+ assertEquals(DorisType.INT, map("int4", -1, -1));
+ assertEquals(DorisType.INT, map("serial", -1, -1));
+ assertEquals(DorisType.BIGINT, map("int8", -1, -1));
+ assertEquals(DorisType.BIGINT, map("bigserial", -1, -1));
+ }
+
+ @Test
+ void floatTypes() {
+ assertEquals(DorisType.FLOAT, map("float4", -1, -1));
+ assertEquals(DorisType.DOUBLE, map("float8", -1, -1));
+ }
+
+ // ─── Boolean / bit
───────────────────────────────────────────────────────
+
+ @Test
+ void boolType() {
+ assertEquals(DorisType.BOOLEAN, map("bool", -1, -1));
+ }
+
+ @Test
+ void bitType_singleBit_isBoolean() {
+ assertEquals(DorisType.BOOLEAN, map("bit", 1, -1));
+ }
+
+ @Test
+ void bitType_multiBit_isString() {
+ assertEquals(DorisType.STRING, map("bit", 8, -1));
+ assertEquals(DorisType.STRING, map("bit", 64, -1));
+ }
+
+ // ─── Numeric / decimal
───────────────────────────────────────────────────
+
+ @Test
+ void numericType_defaultPrecisionScale() {
+ // length <= 0, scale < 0 → DECIMAL(38, 9)
+ assertEquals("DECIMAL(38, 9)", map("numeric", 0, -1));
+ assertEquals("DECIMAL(38, 9)", map("numeric", -1, -1));
+ }
+
+ @Test
+ void numericType_explicitPrecisionScale() {
+ assertEquals("DECIMAL(10, 2)", map("numeric", 10, 2));
+ assertEquals("DECIMAL(5, 0)", map("numeric", 5, 0));
+ }
+
+ @Test
+ void numericType_precisionCappedAt38() {
+ assertEquals("DECIMAL(38, 4)", map("numeric", 50, 4));
+ assertEquals("DECIMAL(38, 9)", map("numeric", 100, -1));
+ }
+
+ // ─── Char types
──────────────────────────────────────────────────────────
+
+ @Test
+ void bpchar_shortLength_isChar() {
+ // length=10 → 10*3=30 ≤ 255 → CHAR(30)
+ assertEquals("CHAR(30)", map("bpchar", 10, -1));
+ assertEquals("CHAR(3)", map("bpchar", 1, -1));
+ }
+
+ @Test
+ void bpchar_longLength_isVarchar() {
+ // length=100 → 100*3=300 > 255 → VARCHAR(300)
+ assertEquals("VARCHAR(300)", map("bpchar", 100, -1));
+ assertEquals("VARCHAR(768)", map("bpchar", 256, -1));
+ }
+
+ @Test
+ void varcharAndText_isString() {
+ assertEquals(DorisType.STRING, map("varchar", 50, -1));
+ assertEquals(DorisType.STRING, map("varchar", -1, -1));
+ assertEquals(DorisType.STRING, map("text", -1, -1));
+ }
+
+ // ─── Date / time
─────────────────────────────────────────────────────────
+
+ @Test
+ void dateType() {
+ assertEquals(DorisType.DATE, map("date", -1, -1));
+ }
+
+ @Test
+ void timestampType_defaultScale_isDatetime6() {
+ // scale < 0 or > 6 → default to 6
+ assertEquals("DATETIME(6)", map("timestamp", -1, -1));
+ assertEquals("DATETIME(6)", map("timestamptz", -1, -1));
+ assertEquals("DATETIME(6)", map("timestamp", -1, 7));
+ }
+
+ @Test
+ void timestampType_explicitScale() {
+ assertEquals("DATETIME(3)", map("timestamp", -1, 3));
+ assertEquals("DATETIME(0)", map("timestamptz", -1, 0));
+ assertEquals("DATETIME(6)", map("timestamp", -1, 6));
+ }
+
+ @Test
+ void timeTypes_isString() {
+ assertEquals(DorisType.STRING, map("time", -1, -1));
+ assertEquals(DorisType.STRING, map("timetz", -1, -1));
+ assertEquals(DorisType.STRING, map("interval", -1, -1));
+ }
+
+ // ─── JSON
────────────────────────────────────────────────────────────────
+
+ @Test
+ void jsonTypes() {
+ assertEquals(DorisType.JSON, map("json", -1, -1));
+ assertEquals(DorisType.JSON, map("jsonb", -1, -1));
+ }
+
+ // ─── Geometric / network / misc types (all map to STRING)
────────────────
+
+ @Test
+ void networkAndMiscTypes_isString() {
+ assertEquals(DorisType.STRING, map("inet", -1, -1));
+ assertEquals(DorisType.STRING, map("cidr", -1, -1));
+ assertEquals(DorisType.STRING, map("macaddr", -1, -1));
+ assertEquals(DorisType.STRING, map("uuid", -1, -1));
+ assertEquals(DorisType.STRING, map("bytea", -1, -1));
+ assertEquals(DorisType.STRING, map("varbit", -1, -1));
+ }
+
+ @Test
+ void geometricTypes_isString() {
+ assertEquals(DorisType.STRING, map("point", -1, -1));
+ assertEquals(DorisType.STRING, map("line", -1, -1));
+ assertEquals(DorisType.STRING, map("lseg", -1, -1));
+ assertEquals(DorisType.STRING, map("box", -1, -1));
+ assertEquals(DorisType.STRING, map("path", -1, -1));
+ assertEquals(DorisType.STRING, map("polygon", -1, -1));
+ assertEquals(DorisType.STRING, map("circle", -1, -1));
+ }
+
+ // ─── Unknown type fallback
───────────────────────────────────────────────
+
+ @Test
+ void unknownType_defaultsToString() {
+ assertEquals(DorisType.STRING, map("custom_type", -1, -1));
+ assertEquals(DorisType.STRING, map("user_defined_enum", -1, -1));
+ }
+
+ // ─── Case-insensitive matching
────────────────────────────────────────────
+
+ @Test
+ void caseInsensitive() {
+ assertEquals(DorisType.INT, map("INT4", -1, -1));
+ assertEquals(DorisType.BIGINT, map("INT8", -1, -1));
+ assertEquals(DorisType.BOOLEAN, map("BOOL", -1, -1));
+ assertEquals(DorisType.FLOAT, map("FLOAT4", -1, -1));
+ assertEquals(DorisType.JSON, map("JSON", -1, -1));
+ assertEquals(DorisType.STRING, map("TEXT", -1, -1));
+ }
+
+ // ─── helper
──────────────────────────────────────────────────────────────
+
+ private static String map(String pgType, int length, int scale) {
+ return SchemaChangeHelper.pgTypeNameToDorisType(pgType, length, scale);
+ }
+}
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.out
new file mode 100644
index 00000000000..a0f35f02943
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.out
@@ -0,0 +1,32 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !snapshot --
+A1 1
+B1 2
+
+-- !add_column --
+A1 1 \N
+B1 2 \N
+C1 10 hello
+
+-- !add_column_dml --
+B1 99 updated
+C1 10 world
+
+-- !drop_column --
+B1 99
+C1 10
+D1 20
+
+-- !rename --
+B1 99
+C1 10
+D1 20
+E1 \N
+
+-- !modify --
+B1 99
+C1 10
+D1 20
+E1 \N
+F1 \N
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.out
new file mode 100644
index 00000000000..24907705c46
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.out
@@ -0,0 +1,31 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !baseline --
+C1 30
+
+-- !double_add --
+C1 30 \N \N
+D1 40 hello 42
+
+-- !rename_guard --
+C1 30 \N \N
+D1 40 hello 42
+E1 50 \N 10
+
+-- !rename_guard_update --
+C1 30 \N \N
+D1 99 \N 42
+E1 50 \N 10
+
+-- !default_col --
+C1 30 default_val
+D1 99 default_val
+E1 50 default_val
+F1 60 default_val
+
+-- !not_null_col --
+C1 30 default_val required
+D1 99 default_val required
+E1 50 default_val required
+F1 60 default_val required
+G1 70 g1c4 explicit
+
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.groovy
new file mode 100644
index 00000000000..bd590e1d97f
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.groovy
@@ -0,0 +1,269 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Schema-change regression for the PostgreSQL CDC streaming job.
+ *
+ * Covers four scenarios in sequence on a single table:
+ * 1. ADD COLUMN – column added in PG → DDL executed in Doris, new data
lands correctly.
+ * Also verifies: pre-ADD rows get NULL for the new
column (existing-data
+ * correctness), and UPDATE/DELETE right after ADD COLUMN
are propagated.
+ * 2. DROP COLUMN – column dropped in PG → DDL executed in Doris,
subsequent data lands correctly.
+ * 3. RENAME COLUMN – rename detected as simultaneous ADD+DROP (rename
guard) →
+ * no DDL in Doris, 'age' column remains, new rows get
age=NULL.
+ * 4. MODIFY COLUMN – type-only change is invisible to the name-based diff →
+ * no DDL in Doris, data continues to flow.
+ */
+suite("test_streaming_postgres_job_sc",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_postgres_job_name_sc"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "user_info_pg_normal1_sc"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // ── helpers
───────────────────────────────────────────────────────────
+
+ // Wait until a specific row appears in the Doris target table.
+ def waitForRow = { String rowName ->
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ (sql "SELECT COUNT(*) FROM ${table1} WHERE name='${rowName}'"
+ )[0][0] as int > 0
+ })
+ }
+
+ // Wait until a column either exists or no longer exists in the Doris
table.
+ def waitForColumn = { String colName, boolean shouldExist ->
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def desc = sql "DESC ${table1}"
+ desc.any { it[0] == colName } == shouldExist
+ })
+ }
+
+ // Wait until a specific row disappears from the Doris target table.
+ def waitForRowGone = { String rowName ->
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ (sql "SELECT COUNT(*) FROM ${table1} WHERE name='${rowName}'"
+ )[0][0] as int == 0
+ })
+ }
+
+ // Wait until a specific column value matches the expected value for a
row.
+ // Comparison is done as strings to avoid JDBC numeric type mismatches
(e.g. Short vs Integer).
+ def waitForValue = { String rowName, String colName, Object expected ->
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def rows = sql "SELECT ${colName} FROM ${table1} WHERE
name='${rowName}'"
+ rows.size() == 1 && String.valueOf(rows[0][0]) ==
String.valueOf(expected)
+ })
+ }
+
+ // Dump job/task state on assertion failures for easier debugging.
+ def dumpJobState = {
+ log.info("jobs : " + sql("""select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ log.info("tasks : " + sql("""select * from tasks("type"="insert")
where JobName='${jobName}'"""))
+ }
+
+ // ── 0. Create PG table and insert snapshot rows
───────────────────────
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgSchema}.${table1}"""
+ sql """CREATE TABLE ${pgSchema}.${table1} (
+ name VARCHAR(200) PRIMARY KEY,
+ age INT2
+ )"""
+ sql """INSERT INTO ${pgSchema}.${table1} VALUES ('A1', 1)"""
+ sql """INSERT INTO ${pgSchema}.${table1} VALUES ('B1', 2)"""
+ }
+
+ // ── 1. Start streaming job
────────────────────────────────────────────
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "offset" = "initial"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )"""
+
+ // Verify the table was auto-created with the expected initial schema.
+ assert (sql "SHOW TABLES FROM ${currentDb} LIKE '${table1}'").size()
== 1
+ // DESC columns: Field(0), Type(1), Null(2), Key(3), Default(4),
Extra(5)
+ def initDesc = sql "DESC ${currentDb}.${table1}"
+ assert initDesc.find { it[0] == 'name' }[1] == 'varchar(65533)' :
"name must be varchar(65533)"
+ assert initDesc.find { it[0] == 'age' }[1] == 'smallint' : "age
must be smallint"
+ assert initDesc.find { it[0] == 'name' }[3] == 'true' :
"name must be primary key"
+
+ // Wait for snapshot to finish (job completes ≥ 2 tasks).
+ try {
+ Awaitility.await().atMost(300, SECONDS).pollInterval(1,
SECONDS).until({
+ def cnt = sql """select SucceedTaskCount from
jobs("type"="insert")
+ where Name='${jobName}' and
ExecuteType='STREAMING'"""
+ cnt.size() == 1 && cnt[0][0] as int >= 2
+ })
+ } catch (Exception ex) {
+ dumpJobState()
+ throw ex
+ }
+
+ // Snapshot data: A1(1), B1(2)
+ qt_snapshot """ SELECT name, age FROM ${table1} ORDER BY name """
+
+ // ── Phase 1: ADD COLUMN c1
────────────────────────────────────────────
+ // PG adds VARCHAR column c1; CDC detects ADD via name diff and
executes
+ // ALTER TABLE … ADD COLUMN c1 on Doris.
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """ALTER TABLE ${pgSchema}.${table1} ADD COLUMN c1
VARCHAR(50)"""
+ sql """INSERT INTO ${pgSchema}.${table1} (name, age, c1) VALUES
('C1', 10, 'hello')"""
+ }
+
+ try {
+ waitForColumn('c1', true)
+ waitForRow('C1')
+ } catch (Exception ex) {
+ dumpJobState()
+ throw ex
+ }
+
+ // Verify c1 was added to Doris and the new row is present.
+ assert (sql "DESC ${table1}").any { it[0] == 'c1' } : "c1 column must
exist in Doris after ADD COLUMN"
+
+ // Pre-ADD rows must have NULL for the new column (existing-data
correctness).
+ assert (sql "SELECT c1 FROM ${table1} WHERE name='A1'")[0][0] == null
: "A1.c1 must be NULL (pre-ADD row)"
+ assert (sql "SELECT c1 FROM ${table1} WHERE name='B1'")[0][0] == null
: "B1.c1 must be NULL (pre-ADD row)"
+
+ // A1(1,null), B1(2,null), C1(10,'hello')
+ qt_add_column """ SELECT name, age, c1 FROM ${table1} ORDER BY name """
+
+ // ── Phase 1b: UPDATE / DELETE immediately after ADD COLUMN
───────────
+ // Verifies that UPDATE (touching the new column) and DELETE on
pre-existing rows
+ // are correctly propagated to Doris after the schema change.
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ // Update the new column on the just-inserted row.
+ sql """UPDATE ${pgSchema}.${table1} SET c1='world' WHERE
name='C1'"""
+ // Update both an old column and the new column on a pre-existing
row.
+ sql """UPDATE ${pgSchema}.${table1} SET age=99, c1='updated' WHERE
name='B1'"""
+ // Delete a pre-existing row.
+ sql """DELETE FROM ${pgSchema}.${table1} WHERE name='A1'"""
+ }
+
+ try {
+ waitForRowGone('A1')
+ waitForValue('B1', 'age', 99)
+ waitForValue('C1', 'c1', 'world')
+ } catch (Exception ex) {
+ dumpJobState()
+ throw ex
+ }
+
+ // A1 deleted; B1(99,'updated'); C1(10,'world')
+ qt_add_column_dml """ SELECT name, age, c1 FROM ${table1} ORDER BY
name """
+
+ // ── Phase 2: DROP COLUMN c1
───────────────────────────────────────────
+ // PG drops c1; CDC detects DROP and executes ALTER TABLE … DROP
COLUMN c1 on Doris.
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """ALTER TABLE ${pgSchema}.${table1} DROP COLUMN c1"""
+ sql """INSERT INTO ${pgSchema}.${table1} (name, age) VALUES ('D1',
20)"""
+ }
+
+ try {
+ waitForColumn('c1', false)
+ waitForRow('D1')
+ } catch (Exception ex) {
+ dumpJobState()
+ throw ex
+ }
+
+ // Verify c1 was removed from Doris and data flows without it.
+ assert !(sql "DESC ${table1}").any { it[0] == 'c1' } : "c1 column must
be gone from Doris after DROP COLUMN"
+ // B1(99), C1(10), D1(20) [A1 was deleted in Phase 1b]
+ qt_drop_column """ SELECT name, age FROM ${table1} ORDER BY name """
+
+ // ── Phase 3: RENAME COLUMN age → age2 (rename guard)
─────────────────
+ // PG rename looks like a simultaneous ADD(age2) + DROP(age) to the
name diff.
+ // The rename guard detects this and emits a WARN with no DDL, so
Doris schema
+ // is unchanged. New PG rows carry 'age2' which has no matching
column in Doris,
+ // so 'age' is NULL for those rows.
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """ALTER TABLE ${pgSchema}.${table1} RENAME COLUMN age TO
age2"""
+ sql """INSERT INTO ${pgSchema}.${table1} (name, age2) VALUES
('E1', 30)"""
+ }
+
+ try {
+ waitForRow('E1')
+ } catch (Exception ex) {
+ dumpJobState()
+ throw ex
+ }
+
+ // 'age' must still exist; 'age2' must NOT have been added.
+ def descAfterRename = sql "DESC ${table1}"
+ assert descAfterRename.any { it[0] == 'age' } : "'age' column must
remain after rename guard"
+ assert !descAfterRename.any { it[0] == 'age2' } : "'age2' must NOT be
added (rename guard, no DDL)"
+ // B1(99), C1(10), D1(20), E1(null) — age=NULL because PG sends age2
which Doris ignores
+ qt_rename """ SELECT name, age FROM ${table1} ORDER BY name """
+
+ // ── Phase 4: MODIFY COLUMN type (name-only diff, no DDL)
─────────────
+ // Type-only change is invisible to the name-based diff, so no DDL is
emitted.
+ // Data continues to flow; age2 values still have no mapping in Doris
→ age=NULL.
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """ALTER TABLE ${pgSchema}.${table1} ALTER COLUMN age2 TYPE
INT4"""
+ sql """INSERT INTO ${pgSchema}.${table1} (name, age2) VALUES
('F1', 50)"""
+ }
+
+ try {
+ waitForRow('F1')
+ } catch (Exception ex) {
+ dumpJobState()
+ throw ex
+ }
+
+ // Doris 'age' column type must remain smallint (mapped from PG int2).
+ assert (sql "DESC ${table1}").find { it[0] == 'age' }[1] == 'smallint'
\
+ : "Doris 'age' type must remain smallint after type-only change in
PG"
+ // B1(99), C1(10), D1(20), E1(null), F1(null)
+ qt_modify """ SELECT name, age FROM ${table1} ORDER BY name """
+
+ assert (sql """select * from jobs("type"="insert") where
Name='${jobName}'""")[0][5] == "RUNNING"
+
+ // ── Cleanup
───────────────────────────────────────────────────────────
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ assert (sql """select count(1) from jobs("type"="insert") where
Name='${jobName}'""")[0][0] == 0
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.groovy
new file mode 100644
index 00000000000..6593f1cf4f2
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.groovy
@@ -0,0 +1,344 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Advanced schema-change regression for the PostgreSQL CDC streaming job.
+ *
+ * Key differences from the basic schema-change test:
+ * - Uses offset=latest (incremental-only, no snapshot) to cover the code
path where
+ * tableSchemas are discovered from PG JDBC rather than derived from
snapshot splits.
+ * This exercises the feHadNoSchema=true branch in PipelineCoordinator.
+ *
+ * Covers uncommon scenarios:
+ * 1. Simultaneous double ADD – two columns added in PG before any DML
triggers detection;
+ * both ALTER TABLEs are generated and executed in a single detection
event.
+ * 2. DROP + ADD simultaneously (rename guard) – dropping one column while
adding another
+ * is treated as a potential rename; no DDL is emitted but the cached
schema is updated.
+ * 3. UPDATE on existing rows after rename guard – verifies that a row whose
old column (c1)
+ * was dropped in PG gets c1=NULL in Doris after the next UPDATE (stream
load replaces the
+ * whole row without c1 since PG no longer has it).
+ * 4. ADD COLUMN with DEFAULT value – verifies that the DEFAULT clause is
passed through to
+ * Doris and that pre-existing rows automatically receive the default
value after the DDL.
+ * 5. ADD COLUMN NOT NULL with DEFAULT – verifies the NOT NULL path in
SchemaChangeHelper
+ * (col.isOptional()=false → appends NOT NULL) and that Doris accepts the
DDL when a
+ * DEFAULT is present (satisfying the NOT NULL constraint for existing
rows).
+ */
+suite("test_streaming_postgres_job_sc_advanced",
+ "p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+
+ def jobName = "test_streaming_pg_sc_advanced"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "user_info_pg_normal1_sc_adv"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // ── helpers
───────────────────────────────────────────────────────────
+
+ def waitForRow = { String rowName ->
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ (sql "SELECT COUNT(*) FROM ${table1} WHERE name='${rowName}'"
+ )[0][0] as int > 0
+ })
+ }
+
+ def waitForRowGone = { String rowName ->
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ (sql "SELECT COUNT(*) FROM ${table1} WHERE name='${rowName}'"
+ )[0][0] as int == 0
+ })
+ }
+
+ def waitForColumn = { String colName, boolean shouldExist ->
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def desc = sql "DESC ${table1}"
+ desc.any { it[0] == colName } == shouldExist
+ })
+ }
+
+ // Comparison is done as strings to avoid JDBC numeric type mismatches.
+ def waitForValue = { String rowName, String colName, Object expected ->
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def rows = sql "SELECT ${colName} FROM ${table1} WHERE
name='${rowName}'"
+ rows.size() == 1 && String.valueOf(rows[0][0]) ==
String.valueOf(expected)
+ })
+ }
+
+ def dumpJobState = {
+ log.info("jobs : " + sql("""select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ log.info("tasks : " + sql("""select * from tasks("type"="insert")
where JobName='${jobName}'"""))
+ }
+
+ // ── 0. Pre-create PG table with existing rows
─────────────────────────
+ // A1, B1 are inserted BEFORE the job starts with offset=latest.
+ // They will NOT appear in Doris (no snapshot taken).
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgSchema}.${table1}"""
+ sql """CREATE TABLE ${pgSchema}.${table1} (
+ name VARCHAR(200) PRIMARY KEY,
+ age INT4
+ )"""
+ sql """INSERT INTO ${pgSchema}.${table1} VALUES ('A1', 10)"""
+ sql """INSERT INTO ${pgSchema}.${table1} VALUES ('B1', 20)"""
+ }
+
+ // ── 1. Start streaming job with offset=latest
─────────────────────────
+ // The Doris table is auto-created from the PG schema at job creation
time.
+ // Streaming begins from the current WAL LSN — A1 and B1 are not
captured.
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "offset" = "latest"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )"""
+
+ assert (sql "SHOW TABLES FROM ${currentDb} LIKE '${table1}'").size()
== 1
+
+ // Wait for job to enter RUNNING state (streaming split established).
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(1,
SECONDS).until({
+ def rows = sql """select Status from jobs("type"="insert")
+ where Name='${jobName}' and
ExecuteType='STREAMING'"""
+ rows.size() == 1 && rows[0][0] == "RUNNING"
+ })
+ } catch (Exception ex) {
+ dumpJobState()
+ throw ex
+ }
+
+ // Baseline: insert C1 to verify streaming is active.
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgSchema}.${table1} VALUES ('C1', 30)"""
+ }
+
+ try {
+ waitForRow('C1')
+ } catch (Exception ex) {
+ dumpJobState()
+ throw ex
+ }
+
+ // A1, B1 must NOT be present (offset=latest, no snapshot).
+ assert (sql "SELECT COUNT(*) FROM ${table1} WHERE name='A1'")[0][0] as
int == 0 \
+ : "A1 must not be present (offset=latest)"
+ assert (sql "SELECT COUNT(*) FROM ${table1} WHERE name='B1'")[0][0] as
int == 0 \
+ : "B1 must not be present (offset=latest)"
+
+ // Only C1(30) should be in Doris.
+ qt_baseline """ SELECT name, age FROM ${table1} ORDER BY name """
+
+ // ── Phase 1: Simultaneous double ADD (c1 TEXT, c2 INT4)
──────────────
+ // Both ALTER TABLEs happen in PG before any DML triggers CDC
detection.
+ // The single INSERT D1 triggers the detection, which fetches the
fresh PG schema
+ // (already containing both c1 and c2), and generates two ADD COLUMN
DDLs in one shot.
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """ALTER TABLE ${pgSchema}.${table1} ADD COLUMN c1 TEXT"""
+ sql """ALTER TABLE ${pgSchema}.${table1} ADD COLUMN c2 INT4"""
+ sql """INSERT INTO ${pgSchema}.${table1} (name, age, c1, c2)
VALUES ('D1', 40, 'hello', 42)"""
+ }
+
+ try {
+ waitForColumn('c1', true)
+ waitForColumn('c2', true)
+ waitForRow('D1')
+ } catch (Exception ex) {
+ dumpJobState()
+ throw ex
+ }
+
+ // DESC columns: Field(0), Type(1), Null(2), Key(3), Default(4),
Extra(5)
+ def descAfterDoubleAdd = sql "DESC ${table1}"
+ assert descAfterDoubleAdd.find { it[0] == 'c1' }[1] == 'text' : "c1
must be added as text"
+ assert descAfterDoubleAdd.find { it[0] == 'c2' }[1] == 'int' : "c2
must be added as int"
+
+ // Pre-double-ADD row C1 must have NULL for both new columns.
+ assert (sql "SELECT c1 FROM ${table1} WHERE name='C1'")[0][0] == null
: "C1.c1 must be NULL"
+ assert (sql "SELECT c2 FROM ${table1} WHERE name='C1'")[0][0] == null
: "C1.c2 must be NULL"
+
+ // C1(30,null,null), D1(40,'hello',42)
+ qt_double_add """ SELECT name, age, c1, c2 FROM ${table1} ORDER BY
name """
+
+ // ── Phase 2: DROP c1 + ADD c3 simultaneously (rename guard)
──────────
+ // Dropping c1 and adding c3 in the same batch looks like a rename to
the CDC detector:
+ // simultaneous ADD+DROP triggers the guard → no DDL emitted, cached
schema updated to
+ // reflect the fresh PG state (c1 gone, c3 present).
+ // Doris table is left with c1 still present; c3 is never added.
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """ALTER TABLE ${pgSchema}.${table1} DROP COLUMN c1"""
+ sql """ALTER TABLE ${pgSchema}.${table1} ADD COLUMN c3 INT4"""
+ sql """INSERT INTO ${pgSchema}.${table1} (name, age, c2, c3)
VALUES ('E1', 50, 10, 99)"""
+ }
+
+ try {
+ waitForRow('E1')
+ } catch (Exception ex) {
+ dumpJobState()
+ throw ex
+ }
+
+ def descAfterRenameGuard = sql "DESC ${table1}"
+ assert descAfterRenameGuard.any { it[0] == 'c1' } : "c1 must remain
(rename guard prevented DROP)"
+ assert !descAfterRenameGuard.any { it[0] == 'c3' } : "c3 must NOT be
added (rename guard prevented ADD)"
+
+ // E1.c1=NULL: PG has c3 (not c1), Doris ignores c3 and writes NULL
for c1.
+ assert (sql "SELECT c1 FROM ${table1} WHERE name='E1'")[0][0] == null
: "E1.c1 must be NULL"
+
+ // C1(30,null,null), D1(40,'hello',42), E1(50,null,10)
+ qt_rename_guard """ SELECT name, age, c1, c2 FROM ${table1} ORDER BY
name """
+
+ // ── Phase 3: UPDATE existing row after rename guard
───────────────────
+ // D1 had c1='hello' at insert time. After the rename guard fires, the
cached schema
+ // reflects PG reality (c1 gone, c3 present). When D1 is updated in PG
(only c3 exists
+ // for non-key columns), the DML record carries no c1 field. Stream
load replaces the
+ // entire row → D1.c1 becomes NULL in Doris.
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ // PG now has columns: name, age, c2, c3 (c1 was dropped)
+ sql """UPDATE ${pgSchema}.${table1} SET age=99, c3=88 WHERE
name='D1'"""
+ }
+
+ try {
+ waitForValue('D1', 'age', 99)
+ } catch (Exception ex) {
+ dumpJobState()
+ throw ex
+ }
+
+ // D1.c1 was 'hello' but after UPDATE the stream load has no c1 field →
+ // Doris replaces the row without c1 → c1=NULL.
+ assert (sql "SELECT c1 FROM ${table1} WHERE name='D1'")[0][0] == null \
+ : "D1.c1 must be NULL after UPDATE (c1 dropped from PG, not in
stream load record)"
+
+ // C1(30,null,null), D1(99,null,null), E1(50,null,10)
+ qt_rename_guard_update """ SELECT name, age, c1, c2 FROM ${table1}
ORDER BY name """
+
+ // ── Phase 4: ADD COLUMN with DEFAULT value
────────────────────────────
+ // PG adds a nullable TEXT column with a DEFAULT value.
+ // buildAddColumnSql picks up col.defaultValueExpression() and appends
DEFAULT 'default_val'
+ // to the Doris ALTER TABLE. After the DDL, Doris fills the default
for all pre-existing
+ // rows (metadata operation), so C1/D1/E1 all get c4='default_val'
without any DML replay.
+ // F1 is inserted without an explicit c4 value → PG fills in the
default → WAL record
+ // already carries c4='default_val', so Doris writes 'default_val' for
F1 as well.
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ // PG current cols: name, age, c2, c3
+ sql """ALTER TABLE ${pgSchema}.${table1} ADD COLUMN c4 TEXT
DEFAULT 'default_val'"""
+ // Trigger schema-change detection; omit c4 → PG fills default in
WAL record.
+ sql """INSERT INTO ${pgSchema}.${table1} (name, age, c2, c3)
VALUES ('F1', 60, 20, 77)"""
+ }
+
+ try {
+ waitForColumn('c4', true)
+ waitForRow('F1')
+ } catch (Exception ex) {
+ dumpJobState()
+ throw ex
+ }
+
+ // DESC columns: Field(0), Type(1), Null(2), Key(3), Default(4),
Extra(5)
+ def descAfterDefaultAdd = sql "DESC ${table1}"
+ def c4Row = descAfterDefaultAdd.find { it[0] == 'c4' }
+ assert c4Row != null : "c4 must be added"
+ assert c4Row[4] == 'default_val' : "c4 must carry DEFAULT
'default_val', got: ${c4Row[4]}"
+
+ // Pre-existing rows receive the default value from Doris's ALTER
TABLE (not from DML replay).
+ try {
+ waitForValue('C1', 'c4', 'default_val')
+ waitForValue('D1', 'c4', 'default_val')
+ waitForValue('E1', 'c4', 'default_val')
+ waitForValue('F1', 'c4', 'default_val')
+ } catch (Exception ex) {
+ dumpJobState()
+ throw ex
+ }
+
+ // C1(30,_,default_val), D1(99,_,default_val), E1(50,_,default_val),
F1(60,_,default_val)
+ qt_default_col """ SELECT name, age, c4 FROM ${table1} ORDER BY name
"""
+
+ // ── Phase 5: ADD COLUMN NOT NULL with DEFAULT
─────────────────────────
+ // In PG, adding a NOT NULL column to a non-empty table requires a
DEFAULT so existing rows
+ // satisfy the constraint. Debezium captures col.isOptional()=false,
so SchemaChangeHelper
+ // appends NOT NULL to the Doris column type, and the DEFAULT clause
is also passed through.
+ // With both NOT NULL and DEFAULT, Doris can apply the DDL: existing
rows get the default
+ // value (satisfying NOT NULL), and new rows must supply a value or
receive the default.
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ // PG current cols: name, age, c2, c3, c4
+ sql """ALTER TABLE ${pgSchema}.${table1}
+ ADD COLUMN c5 TEXT NOT NULL DEFAULT 'required'"""
+ sql """INSERT INTO ${pgSchema}.${table1} (name, age, c2, c3, c4,
c5)
+ VALUES ('G1', 70, 30, 66, 'g1c4', 'explicit')"""
+ }
+
+ try {
+ waitForColumn('c5', true)
+ waitForRow('G1')
+ } catch (Exception ex) {
+ dumpJobState()
+ throw ex
+ }
+
+ // DESC columns: Field(0), Type(1), Null(2), Key(3), Default(4),
Extra(5)
+ def descAfterNotNullAdd = sql "DESC ${table1}"
+ def c5Row = descAfterNotNullAdd.find { it[0] == 'c5' }
+ assert c5Row != null : "c5 must be added"
+ assert c5Row[4] == 'required' : "c5 must carry DEFAULT 'required',
got: ${c5Row[4]}"
+
+ // Pre-existing rows must have the default value (Doris ALTER TABLE
fills it).
+ // G1 was inserted with an explicit 'explicit' value.
+ try {
+ waitForValue('C1', 'c5', 'required')
+ waitForValue('D1', 'c5', 'required')
+ waitForValue('G1', 'c5', 'explicit')
+ } catch (Exception ex) {
+ dumpJobState()
+ throw ex
+ }
+
+ // C1(_,default_val,required), D1(_,default_val,required),
...G1(_,g1c4,explicit)
+ qt_not_null_col """ SELECT name, age, c4, c5 FROM ${table1} ORDER BY
name """
+
+ assert (sql """select * from jobs("type"="insert") where
Name='${jobName}'""")[0][5] == "RUNNING"
+
+ // ── Cleanup
───────────────────────────────────────────────────────────
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ assert (sql """select count(1) from jobs("type"="insert") where
Name='${jobName}'""")[0][0] == 0
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]