github-actions[bot] commented on code in PR #61182:
URL: https://github.com/apache/doris/pull/61182#discussion_r2916505813


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java:
##########
@@ -52,4 +52,8 @@ protected boolean isRedirectable(String method) {
                 .addInterceptorLast(new RequestContent(true))
                 .build();
     }
+
+    public static String getAuthHeader() {
+        return "Basic YWRtaW46";

Review Comment:
   **[Medium]** `getAuthHeader()` returns a hardcoded `"Basic YWRtaW46"` which 
is base64 of `admin:` (user=admin, empty password). This was a pre-existing 
pattern in `HttpPutBuilder`, but now that it's centralized here and also used 
by `SchemaChangeManager` for DDL execution, it becomes more prominent.
   
   The `SchemaChangeManager.buildHttpPost()` already passes a `token` parameter 
and sets it as a header — so the `Authorization: Basic ...` header may be 
redundant if token-based auth is sufficient. But if Basic auth is required, 
this should be parameterized from the job's configured credentials rather than 
hardcoded.
   
   Not a regression (same behavior as before), but worth tracking as tech debt.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -359,6 +368,29 @@ protected Map<TableId, TableChanges.TableChange> 
discoverTableSchemas(JobBaseCon
         }
     }
 
+    /**
+     * Fetch the current schema for a single table directly from PostgreSQL 
via JDBC.
+     *
+     * <p>Called by {@link PostgresDebeziumJsonDeserializer} when a schema 
change (ADD/DROP column)
+     * is detected, to obtain accurate PG column types for DDL generation.
+     *
+     * @return the fresh {@link TableChanges.TableChange}
+     */
+    private TableChanges.TableChange refreshSingleTableSchema(
+            TableId tableId, Map<String, String> config, long jobId) {
+        PostgresSourceConfig sourceConfig = generatePostgresConfig(config, 
jobId, 0);
+        PostgresDialect dialect = new PostgresDialect(sourceConfig);
+        try (JdbcConnection jdbcConnection = 
dialect.openJdbcConnection(sourceConfig)) {
+            CustomPostgresSchema customPostgresSchema =
+                    new CustomPostgresSchema((PostgresConnection) 
jdbcConnection, sourceConfig);
+            Map<TableId, TableChanges.TableChange> schemas =
+                    
customPostgresSchema.getTableSchema(Collections.singletonList(tableId));
+            return schemas.get(tableId);
+        } catch (Exception e) {
+            throw new RuntimeException(e);

Review Comment:
   **[Low]** `refreshSingleTableSchema` catches `Exception` and wraps it in 
`RuntimeException`. Since this is called as the `pgSchemaRefresher` callback 
from `PostgresDebeziumJsonDeserializer.deserialize()`, the `RuntimeException` 
will bypass the `IOException` catch in the deserializer and propagate up to 
`PipelineCoordinator.writeRecords()` where it's caught by the generic 
`Exception` handler.
   
   This is functionally safe (the batch will fail and retry), but consider 
either:
   - Throwing a checked exception type so the deserializer can handle it 
explicitly, or
   - Wrapping in `RuntimeException` with a more descriptive message that 
includes the table ID and JDBC URL for debugging.
   
   The current error message from the RuntimeException wrapper loses the table 
context.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java:
##########
@@ -0,0 +1,248 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.deserialize;
+
+import org.apache.doris.cdcclient.common.Constants;
+import org.apache.doris.cdcclient.utils.SchemaChangeHelper;
+
+import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
+import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
+
+import io.debezium.data.Envelope;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableEditor;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * PostgreSQL-specific deserializer that detects schema changes (ADD/DROP 
column only) by comparing
+ * the record's Kafka Connect schema field names with stored tableSchemas.
+ *
+ * <p>Because PostgreSQL does not emit DDL events in the WAL stream, schema 
detection is done by
+ * comparing the "after" struct field names in each DML record against the 
known column set.
+ *
+ * <p>Type comparison is intentionally skipped to avoid false positives caused 
by Kafka Connect type
+ * ambiguity (e.g. text/varchar/json/uuid all appear as STRING). When a column 
add or drop is
+ * detected, the accurate column types are fetched directly from PostgreSQL 
via the injected {@link
+ * #pgSchemaRefresher} callback.
+ *
+ * <p>MODIFY column type is not supported — users must manually execute ALTER 
TABLE ... MODIFY
+ * COLUMN in Doris when a PG column type changes.
+ */
+public class PostgresDebeziumJsonDeserializer extends DebeziumJsonDeserializer 
{
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG =
+            LoggerFactory.getLogger(PostgresDebeziumJsonDeserializer.class);
+
+    /**
+     * Callback to fetch the current PG table schema for a single table via 
JDBC. Injected by {@link
+     * org.apache.doris.cdcclient.source.reader.postgres.PostgresSourceReader} 
after initialization.
+     */
+    private transient Function<TableId, TableChanges.TableChange> 
pgSchemaRefresher;
+
+    public void setPgSchemaRefresher(Function<TableId, 
TableChanges.TableChange> refresher) {
+        this.pgSchemaRefresher = refresher;
+    }
+
+    @Override
+    public DeserializeResult deserialize(Map<String, String> context, 
SourceRecord record)
+            throws IOException {
+        if (!RecordUtils.isDataChangeRecord(record)) {
+            return DeserializeResult.empty();
+        }
+
+        Schema valueSchema = record.valueSchema();
+        if (valueSchema == null) {
+            return super.deserialize(context, record);
+        }
+
+        Field afterField = valueSchema.field(Envelope.FieldName.AFTER);
+        if (afterField == null) {
+            return super.deserialize(context, record);
+        }
+
+        Schema afterSchema = afterField.schema();
+        TableId tableId = extractTableId(record);
+        TableChanges.TableChange stored = tableSchemas != null ? 
tableSchemas.get(tableId) : null;
+
+        // No baseline schema available — cannot detect changes, fall through 
to normal
+        // deserialization
+        if (stored == null || stored.getTable() == null) {
+            LOG.debug(
+                    "No stored schema for table {}, skipping schema change 
detection.",
+                    tableId.identifier());
+            return super.deserialize(context, record);
+        }
+
+        // First pass: name-only diff — fast, in-memory, no type comparison, 
no false positives
+        SchemaChangeHelper.SchemaDiff nameDiff =
+                SchemaChangeHelper.diffSchemaByName(afterSchema, stored);
+        if (nameDiff.isEmpty()) {
+            return super.deserialize(context, record);
+        }
+
+        Preconditions.checkNotNull(
+                pgSchemaRefresher,
+                "pgSchemaRefresher callback is not set. Cannot fetch fresh PG 
schema for change detection.");
+
+        // the last fresh schema
+        TableChanges.TableChange fresh = pgSchemaRefresher.apply(tableId);
+        if (fresh == null || fresh.getTable() == null) {
+            // Cannot proceed: DDL must be executed before the triggering DML 
record is written,
+            // otherwise new column data in this record would be silently 
dropped.
+            // Throwing here causes the batch to be retried from the same 
offset.
+            throw new IOException(
+                    "Failed to fetch fresh schema for table "
+                            + tableId.identifier()
+                            + "; cannot apply schema change safely. Will 
retry.");
+        }
+
+        // Second diff: use afterSchema as the source of truth for which 
columns the current WAL
+        // record is aware of. Only process additions/drops visible in 
afterSchema — columns that
+        // exist in fresh (JDBC) but are absent from afterSchema belong to a 
later DDL that has not
+        // yet produced a DML record, and will be processed when that DML 
record arrives.
+        //
+        // pgAdded: present in afterSchema but absent from stored → look up 
Column in fresh for
+        //          accurate PG type metadata. If fresh doesn't have the 
column yet (shouldn't
+        //          happen normally), skip it.
+        // pgDropped: present in stored but absent from afterSchema.
+        List<Column> pgAdded = new ArrayList<>();
+        List<String> pgDropped = new ArrayList<>();
+
+        for (Field field : afterSchema.fields()) {
+            if (stored.getTable().columnWithName(field.name()) == null) {
+                Column freshCol = 
fresh.getTable().columnWithName(field.name());
+                if (freshCol != null) {
+                    pgAdded.add(freshCol);
+                }
+            }
+        }
+
+        for (Column col : stored.getTable().columns()) {
+            if (afterSchema.field(col.name()) == null) {
+                pgDropped.add(col.name());
+            }
+        }
+
+        // Second diff is empty: nameDiff was a false positive (PG schema 
unchanged vs stored).
+        // This happens when pgSchemaRefresher returns a schema ahead of the 
current WAL position
+        // (e.g. a later DDL was already applied in PG while we're still 
consuming older DML
+        // records).
+        // No DDL needed, no tableSchema update, no extra stream load — just 
process the DML
+        // normally.
+        if (pgAdded.isEmpty() && pgDropped.isEmpty()) {
+            return super.deserialize(context, record);
+        }
+
+        // Build updatedSchemas from fresh filtered to afterSchema columns 
only, so that the stored
+        // cache does not jump ahead to include columns not yet seen by any 
DML record. Those
+        // unseen columns will trigger their own schema change when their 
first DML record arrives.
+        TableEditor editor = fresh.getTable().edit();
+        for (Column col : fresh.getTable().columns()) {
+            if (afterSchema.field(col.name()) == null) {
+                editor.removeColumn(col.name());
+            }
+        }
+        TableChanges.TableChange filteredChange =
+                new 
TableChanges.TableChange(TableChanges.TableChangeType.ALTER, editor.create());
+        Map<TableId, TableChanges.TableChange> updatedSchemas = new 
HashMap<>();
+        updatedSchemas.put(tableId, filteredChange);
+
+        // Rename guard: simultaneous ADD+DROP may be a column RENAME — skip 
DDL to avoid data loss.
+        // Users must manually RENAME the column in Doris.
+        if (!pgAdded.isEmpty() && !pgDropped.isEmpty()) {

Review Comment:
   **[Info — Good Design]** The rename guard (simultaneous ADD+DROP → skip DDL, 
update cached schema) is a well-designed safety measure. The decision to still 
return a `schemaChange` result with empty DDLs but updated schemas is correct — 
it advances the cached schema to reflect PG reality without emitting 
potentially destructive DDLs.
   
   The warning log message is clear and actionable, telling the user to 
manually rename the column in Doris.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java:
##########
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.reader;
+
+import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
+import org.apache.doris.cdcclient.source.deserialize.SourceRecordDeserializer;
+import org.apache.doris.cdcclient.utils.SchemaChangeManager;
+import org.apache.doris.job.cdc.request.JobBaseRecordRequest;
+
+import org.apache.flink.cdc.connectors.base.utils.SerializerUtils;
+import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.debezium.document.Document;
+import io.debezium.document.DocumentReader;
+import io.debezium.document.DocumentWriter;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import lombok.Getter;
+import lombok.Setter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base class providing common schema-tracking functionality for CDC 
source readers.
+ *
+ * <p>Handles serialization/deserialization of {@code tableSchemas} between FE 
and cdc_client, and
+ * provides a helper to load schemas from the incoming {@link 
JobBaseRecordRequest}.
+ */
+@Getter
+@Setter
+public abstract class AbstractCdcSourceReader implements SourceReader {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractCdcSourceReader.class);
+
+    protected static final FlinkJsonTableChangeSerializer 
TABLE_CHANGE_SERIALIZER =
+            new FlinkJsonTableChangeSerializer();
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    protected SourceRecordDeserializer<SourceRecord, DeserializeResult> 
serializer;
+    protected Map<TableId, TableChanges.TableChange> tableSchemas;
+
+    /**
+     * Load tableSchemas from a JSON string (produced by {@link 
#serializeTableSchemas()}). Used
+     * when a binlog/stream split is resumed from FE-persisted state.
+     *
+     * <p>Format: {@code 
[{"i":"\"schema\".\"table\"","uc":false,"c":{...debeziumDoc...}},...]}.
+     */
+    public void loadTableSchemasFromJson(String json) throws IOException {
+        if (json == null || json.isEmpty()) {
+            return;
+        }
+        JsonNode root = OBJECT_MAPPER.readTree(json);
+        Map<TableId, TableChanges.TableChange> map = new ConcurrentHashMap<>();
+        DocumentReader docReader = DocumentReader.defaultReader();
+        for (JsonNode entry : root) {
+            boolean uc = entry.path("uc").asBoolean(false);
+            TableId tableId = TableId.parse(entry.get("i").asText(), uc);
+            Document doc = 
docReader.read(OBJECT_MAPPER.writeValueAsString(entry.get("c")));
+            TableChanges.TableChange change = 
FlinkJsonTableChangeSerializer.fromDocument(doc, uc);
+            map.put(tableId, change);
+        }
+        this.tableSchemas = map;
+        this.serializer.setTableSchemas(map);
+        LOG.info("Loaded {} table schemas from JSON", map.size());
+    }
+
+    /**
+     * Serialize current tableSchemas to a compact JSON string for FE 
persistence.
+     *
+     * <p>Stores the Debezium document as a nested JSON object (not a string) 
to avoid redundant
+     * escaping. Format: {@code
+     * [{"i":"\"schema\".\"table\"","uc":false,"c":{...debeziumDoc...}},...]}.
+     */
+    @Override
+    public String serializeTableSchemas() {
+        if (tableSchemas == null || tableSchemas.isEmpty()) {
+            return null;
+        }
+        try {
+            DocumentWriter docWriter = DocumentWriter.defaultWriter();
+            ArrayNode result = OBJECT_MAPPER.createArrayNode();
+            for (Map.Entry<TableId, TableChanges.TableChange> e : 
tableSchemas.entrySet()) {
+                TableId tableId = e.getKey();
+                // useCatalogBeforeSchema: false when catalog is null but 
schema is set (e.g. PG)
+                boolean uc = 
SerializerUtils.shouldUseCatalogBeforeSchema(tableId);
+                ObjectNode entry = OBJECT_MAPPER.createObjectNode();
+                entry.put("i", tableId.toDoubleQuotedString());
+                entry.put("uc", uc);
+                // parse compact doc JSON into a JsonNode so "c" is a nested 
object, not a string
+                entry.set(
+                        "c",
+                        OBJECT_MAPPER.readTree(
+                                
docWriter.write(TABLE_CHANGE_SERIALIZER.toDocument(e.getValue()))));
+                result.add(entry);
+            }
+            return OBJECT_MAPPER.writeValueAsString(result);
+        } catch (Exception e) {

Review Comment:
   **[Low]** `serializeTableSchemas()` catches `Exception` and returns `null` 
with a WARN log. This means if serialization fails (e.g., due to an unexpected 
Debezium document format), the tableSchemas won't be persisted to FE in this 
commit cycle.
   
   On the next batch, `feHadNoSchema` will be true (since FE has no schema), 
and `serializeTableSchemas()` will be called again. If the same serialization 
error persists, schemas will never be persisted — but the in-memory schema will 
still be correct for the current CDC client session.
   
   This is acceptable for robustness (don't fail the whole batch for a 
serialization issue), but consider logging at ERROR level since losing schema 
persistence could cause schema re-detection on restart.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeManager.java:
##########
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Static utility class for executing DDL schema changes on the Doris FE via 
HTTP. */
+public class SchemaChangeManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SchemaChangeManager.class);
+    private static final String SCHEMA_CHANGE_API = 
"http://%s/api/query/default_cluster/%s";;
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final String COLUMN_EXISTS_MSG = "Can not add column which 
already exists";
+    private static final String COLUMN_NOT_EXISTS_MSG = "Column does not 
exists";
+
+    private SchemaChangeManager() {}
+
+    /**
+     * Execute a list of DDL statements on FE. Each statement is sent 
independently.
+     *
+     * <p>Idempotent errors (ADD COLUMN when column already exists, DROP 
COLUMN when column does not
+     * exist) are logged as warnings and silently skipped, so retries on a 
different BE after a
+     * failed commitOffset do not cause infinite failures.
+     *
+     * @param feAddr Doris FE address (host:port)
+     * @param db target database
+     * @param token FE auth token
+     * @param sqls DDL statements to execute
+     */
+    public static void executeDdls(String feAddr, String db, String token, 
List<String> sqls)
+            throws IOException {
+        if (sqls == null || sqls.isEmpty()) {
+            LOG.info("No DDL statements to execute");
+            return;
+        }
+        for (String stmt : sqls) {
+            stmt = stmt.trim();
+            if (stmt.isEmpty()) {
+                continue;
+            }
+            LOG.info("Executing DDL on FE {}: {}", feAddr, stmt);
+            execute(feAddr, db, token, stmt);
+        }
+    }
+
+    /**
+     * Execute a single SQL statement via the FE query API.
+     *
+     * <p>Idempotent errors are swallowed with a warning; all other errors 
throw {@link
+     * IOException}.
+     */
+    public static void execute(String feAddr, String db, String token, String 
sql)
+            throws IOException {
+        HttpPost post = buildHttpPost(feAddr, db, token, sql);
+        String responseBody = handleResponse(post);
+        LOG.info("Executed DDL {} with response: {}", sql, responseBody);
+        parseResponse(sql, responseBody);
+    }
+
+    // ─── Internal helpers 
─────────────────────────────────────────────────────
+
+    private static HttpPost buildHttpPost(String feAddr, String db, String 
token, String sql)
+            throws IOException {
+        String url = String.format(SCHEMA_CHANGE_API, feAddr, db);
+        Map<String, Object> bodyMap = new HashMap<>();
+        bodyMap.put("stmt", sql);
+        String body = OBJECT_MAPPER.writeValueAsString(bodyMap);
+
+        HttpPost post = new HttpPost(url);
+        post.setHeader("Content-Type", "application/json;charset=UTF-8");
+        post.setHeader("Authorization", HttpUtil.getAuthHeader());
+        post.setHeader("token", token);
+        post.setEntity(new StringEntity(body, "UTF-8"));
+        return post;
+    }
+
+    private static String handleResponse(HttpPost request) throws IOException {
+        try (CloseableHttpClient client = HttpUtil.getHttpClient();

Review Comment:
   **[Low]** `handleResponse` creates a new `CloseableHttpClient` for each DDL 
statement via `HttpUtil.getHttpClient()`. Since DDLs are rare events, this is 
fine. But if multiple DDLs are generated in a single detection event (e.g., 
double ADD), each DDL gets its own HTTP client + connection. Consider passing 
the client from the caller loop in `executeDdls()` to reuse the connection.
   
   Not a functional issue — just a minor efficiency suggestion for the uncommon 
multi-DDL case.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -309,15 +325,22 @@ public void writeRecords(WriteRecordRequest 
writeRecordRequest) throws Exception
                     }
 
                     // Process data messages
-                    List<String> serializedRecords =
-                            
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
-
-                    if (!CollectionUtils.isEmpty(serializedRecords)) {
-                        String database = writeRecordRequest.getTargetDb();
+                    DeserializeResult result =
+                            sourceReader.deserialize(deserializeContext, 
element);
+
+                    if (result.getType() == 
DeserializeResult.Type.SCHEMA_CHANGE) {
+                        // Flush pending data before DDL
+                        batchStreamLoad.forceFlush();
+                        SchemaChangeManager.executeDdls(feAddr, targetDb, 
token, result.getDdls());
+                        hasExecuteDDL = true;
+                        
sourceReader.applySchemaChange(result.getUpdatedSchemas());
+                        lastMessageIsHeartbeat = false;
+                    }
+                    if (!CollectionUtils.isEmpty(result.getRecords())) {

Review Comment:
   **[Info — Good Design]** The flow here correctly handles the case where a 
SCHEMA_CHANGE result also carries DML records:
   1. `forceFlush()` — flush any pending data from previous records
   2. `executeDdls()` — apply DDLs to Doris
   3. `applySchemaChange()` — update in-memory cached schema
   4. Fall through to `if (!CollectionUtils.isEmpty(result.getRecords()))` — 
write the triggering DML record
   
   This ordering ensures the DDL is applied before the DML that contains the 
new columns, preventing data loss. The `if` (not `else if`) on line 339 is 
intentional and correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to