github-actions[bot] commented on code in PR #61182:
URL: https://github.com/apache/doris/pull/61182#discussion_r2916505813
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java:
##########
@@ -52,4 +52,8 @@ protected boolean isRedirectable(String method) {
.addInterceptorLast(new RequestContent(true))
.build();
}
+
+ public static String getAuthHeader() {
+ return "Basic YWRtaW46";
Review Comment:
**[Medium]** `getAuthHeader()` returns a hardcoded `"Basic YWRtaW46"` which
is base64 of `admin:` (user=admin, empty password). This was a pre-existing
pattern in `HttpPutBuilder`, but now that it's centralized here and also used
by `SchemaChangeManager` for DDL execution, it becomes more prominent.
The `SchemaChangeManager.buildHttpPost()` already passes a `token` parameter
and sets it as a header — so the `Authorization: Basic ...` header may be
redundant if token-based auth is sufficient. But if Basic auth is required,
this should be parameterized from the job's configured credentials rather than
hardcoded.
Not a regression (same behavior as before), but worth tracking as tech debt.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -359,6 +368,29 @@ protected Map<TableId, TableChanges.TableChange>
discoverTableSchemas(JobBaseCon
}
}
+ /**
+ * Fetch the current schema for a single table directly from PostgreSQL
via JDBC.
+ *
+ * <p>Called by {@link PostgresDebeziumJsonDeserializer} when a schema
change (ADD/DROP column)
+ * is detected, to obtain accurate PG column types for DDL generation.
+ *
+ * @return the fresh {@link TableChanges.TableChange}
+ */
+ private TableChanges.TableChange refreshSingleTableSchema(
+ TableId tableId, Map<String, String> config, long jobId) {
+ PostgresSourceConfig sourceConfig = generatePostgresConfig(config,
jobId, 0);
+ PostgresDialect dialect = new PostgresDialect(sourceConfig);
+ try (JdbcConnection jdbcConnection =
dialect.openJdbcConnection(sourceConfig)) {
+ CustomPostgresSchema customPostgresSchema =
+ new CustomPostgresSchema((PostgresConnection)
jdbcConnection, sourceConfig);
+ Map<TableId, TableChanges.TableChange> schemas =
+
customPostgresSchema.getTableSchema(Collections.singletonList(tableId));
+ return schemas.get(tableId);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
Review Comment:
**[Low]** `refreshSingleTableSchema` catches `Exception` and wraps it in
`RuntimeException`. Since this is called as the `pgSchemaRefresher` callback
from `PostgresDebeziumJsonDeserializer.deserialize()`, the `RuntimeException`
will bypass the `IOException` catch in the deserializer and propagate up to
`PipelineCoordinator.writeRecords()` where it's caught by the generic
`Exception` handler.
This is functionally safe (the batch will fail and retry), but consider
either:
- Throwing a checked exception type so the deserializer can handle it
explicitly, or
- Wrapping in `RuntimeException` with a more descriptive message that
includes the table ID and JDBC URL for debugging.
The current error message from the RuntimeException wrapper loses the table
context.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java:
##########
@@ -0,0 +1,248 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.deserialize;
+
+import org.apache.doris.cdcclient.common.Constants;
+import org.apache.doris.cdcclient.utils.SchemaChangeHelper;
+
+import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
+import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
+
+import io.debezium.data.Envelope;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableEditor;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * PostgreSQL-specific deserializer that detects schema changes (ADD/DROP
column only) by comparing
+ * the record's Kafka Connect schema field names with stored tableSchemas.
+ *
+ * <p>Because PostgreSQL does not emit DDL events in the WAL stream, schema
detection is done by
+ * comparing the "after" struct field names in each DML record against the
known column set.
+ *
+ * <p>Type comparison is intentionally skipped to avoid false positives caused
by Kafka Connect type
+ * ambiguity (e.g. text/varchar/json/uuid all appear as STRING). When a column
add or drop is
+ * detected, the accurate column types are fetched directly from PostgreSQL
via the injected {@link
+ * #pgSchemaRefresher} callback.
+ *
+ * <p>MODIFY column type is not supported — users must manually execute ALTER
TABLE ... MODIFY
+ * COLUMN in Doris when a PG column type changes.
+ */
+public class PostgresDebeziumJsonDeserializer extends DebeziumJsonDeserializer
{
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PostgresDebeziumJsonDeserializer.class);
+
+ /**
+ * Callback to fetch the current PG table schema for a single table via
JDBC. Injected by {@link
+ * org.apache.doris.cdcclient.source.reader.postgres.PostgresSourceReader}
after initialization.
+ */
+ private transient Function<TableId, TableChanges.TableChange>
pgSchemaRefresher;
+
+ public void setPgSchemaRefresher(Function<TableId,
TableChanges.TableChange> refresher) {
+ this.pgSchemaRefresher = refresher;
+ }
+
+ @Override
+ public DeserializeResult deserialize(Map<String, String> context,
SourceRecord record)
+ throws IOException {
+ if (!RecordUtils.isDataChangeRecord(record)) {
+ return DeserializeResult.empty();
+ }
+
+ Schema valueSchema = record.valueSchema();
+ if (valueSchema == null) {
+ return super.deserialize(context, record);
+ }
+
+ Field afterField = valueSchema.field(Envelope.FieldName.AFTER);
+ if (afterField == null) {
+ return super.deserialize(context, record);
+ }
+
+ Schema afterSchema = afterField.schema();
+ TableId tableId = extractTableId(record);
+ TableChanges.TableChange stored = tableSchemas != null ?
tableSchemas.get(tableId) : null;
+
+ // No baseline schema available — cannot detect changes, fall through
to normal
+ // deserialization
+ if (stored == null || stored.getTable() == null) {
+ LOG.debug(
+ "No stored schema for table {}, skipping schema change
detection.",
+ tableId.identifier());
+ return super.deserialize(context, record);
+ }
+
+ // First pass: name-only diff — fast, in-memory, no type comparison,
no false positives
+ SchemaChangeHelper.SchemaDiff nameDiff =
+ SchemaChangeHelper.diffSchemaByName(afterSchema, stored);
+ if (nameDiff.isEmpty()) {
+ return super.deserialize(context, record);
+ }
+
+ Preconditions.checkNotNull(
+ pgSchemaRefresher,
+ "pgSchemaRefresher callback is not set. Cannot fetch fresh PG
schema for change detection.");
+
+ // the last fresh schema
+ TableChanges.TableChange fresh = pgSchemaRefresher.apply(tableId);
+ if (fresh == null || fresh.getTable() == null) {
+ // Cannot proceed: DDL must be executed before the triggering DML
record is written,
+ // otherwise new column data in this record would be silently
dropped.
+ // Throwing here causes the batch to be retried from the same
offset.
+ throw new IOException(
+ "Failed to fetch fresh schema for table "
+ + tableId.identifier()
+ + "; cannot apply schema change safely. Will
retry.");
+ }
+
+ // Second diff: use afterSchema as the source of truth for which
columns the current WAL
+ // record is aware of. Only process additions/drops visible in
afterSchema — columns that
+ // exist in fresh (JDBC) but are absent from afterSchema belong to a
later DDL that has not
+ // yet produced a DML record, and will be processed when that DML
record arrives.
+ //
+ // pgAdded: present in afterSchema but absent from stored → look up
Column in fresh for
+ // accurate PG type metadata. If fresh doesn't have the
column yet (shouldn't
+ // happen normally), skip it.
+ // pgDropped: present in stored but absent from afterSchema.
+ List<Column> pgAdded = new ArrayList<>();
+ List<String> pgDropped = new ArrayList<>();
+
+ for (Field field : afterSchema.fields()) {
+ if (stored.getTable().columnWithName(field.name()) == null) {
+ Column freshCol =
fresh.getTable().columnWithName(field.name());
+ if (freshCol != null) {
+ pgAdded.add(freshCol);
+ }
+ }
+ }
+
+ for (Column col : stored.getTable().columns()) {
+ if (afterSchema.field(col.name()) == null) {
+ pgDropped.add(col.name());
+ }
+ }
+
+ // Second diff is empty: nameDiff was a false positive (PG schema
unchanged vs stored).
+ // This happens when pgSchemaRefresher returns a schema ahead of the
current WAL position
+ // (e.g. a later DDL was already applied in PG while we're still
consuming older DML
+ // records).
+ // No DDL needed, no tableSchema update, no extra stream load — just
process the DML
+ // normally.
+ if (pgAdded.isEmpty() && pgDropped.isEmpty()) {
+ return super.deserialize(context, record);
+ }
+
+ // Build updatedSchemas from fresh filtered to afterSchema columns
only, so that the stored
+ // cache does not jump ahead to include columns not yet seen by any
DML record. Those
+ // unseen columns will trigger their own schema change when their
first DML record arrives.
+ TableEditor editor = fresh.getTable().edit();
+ for (Column col : fresh.getTable().columns()) {
+ if (afterSchema.field(col.name()) == null) {
+ editor.removeColumn(col.name());
+ }
+ }
+ TableChanges.TableChange filteredChange =
+ new
TableChanges.TableChange(TableChanges.TableChangeType.ALTER, editor.create());
+ Map<TableId, TableChanges.TableChange> updatedSchemas = new
HashMap<>();
+ updatedSchemas.put(tableId, filteredChange);
+
+ // Rename guard: simultaneous ADD+DROP may be a column RENAME — skip
DDL to avoid data loss.
+ // Users must manually RENAME the column in Doris.
+ if (!pgAdded.isEmpty() && !pgDropped.isEmpty()) {
Review Comment:
**[Info — Good Design]** The rename guard (simultaneous ADD+DROP → skip DDL,
update cached schema) is a well-designed safety measure. The decision to still
return a `schemaChange` result with empty DDLs but updated schemas is correct —
it advances the cached schema to reflect PG reality without emitting
potentially destructive DDLs.
The warning log message is clear and actionable, telling the user to
manually rename the column in Doris.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java:
##########
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.reader;
+
+import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
+import org.apache.doris.cdcclient.source.deserialize.SourceRecordDeserializer;
+import org.apache.doris.cdcclient.utils.SchemaChangeManager;
+import org.apache.doris.job.cdc.request.JobBaseRecordRequest;
+
+import org.apache.flink.cdc.connectors.base.utils.SerializerUtils;
+import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.debezium.document.Document;
+import io.debezium.document.DocumentReader;
+import io.debezium.document.DocumentWriter;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import lombok.Getter;
+import lombok.Setter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base class providing common schema-tracking functionality for CDC
source readers.
+ *
+ * <p>Handles serialization/deserialization of {@code tableSchemas} between FE
and cdc_client, and
+ * provides a helper to load schemas from the incoming {@link
JobBaseRecordRequest}.
+ */
+@Getter
+@Setter
+public abstract class AbstractCdcSourceReader implements SourceReader {
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractCdcSourceReader.class);
+
+ protected static final FlinkJsonTableChangeSerializer
TABLE_CHANGE_SERIALIZER =
+ new FlinkJsonTableChangeSerializer();
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ protected SourceRecordDeserializer<SourceRecord, DeserializeResult>
serializer;
+ protected Map<TableId, TableChanges.TableChange> tableSchemas;
+
+ /**
+ * Load tableSchemas from a JSON string (produced by {@link
#serializeTableSchemas()}). Used
+ * when a binlog/stream split is resumed from FE-persisted state.
+ *
+ * <p>Format: {@code
[{"i":"\"schema\".\"table\"","uc":false,"c":{...debeziumDoc...}},...]}.
+ */
+ public void loadTableSchemasFromJson(String json) throws IOException {
+ if (json == null || json.isEmpty()) {
+ return;
+ }
+ JsonNode root = OBJECT_MAPPER.readTree(json);
+ Map<TableId, TableChanges.TableChange> map = new ConcurrentHashMap<>();
+ DocumentReader docReader = DocumentReader.defaultReader();
+ for (JsonNode entry : root) {
+ boolean uc = entry.path("uc").asBoolean(false);
+ TableId tableId = TableId.parse(entry.get("i").asText(), uc);
+ Document doc =
docReader.read(OBJECT_MAPPER.writeValueAsString(entry.get("c")));
+ TableChanges.TableChange change =
FlinkJsonTableChangeSerializer.fromDocument(doc, uc);
+ map.put(tableId, change);
+ }
+ this.tableSchemas = map;
+ this.serializer.setTableSchemas(map);
+ LOG.info("Loaded {} table schemas from JSON", map.size());
+ }
+
+ /**
+ * Serialize current tableSchemas to a compact JSON string for FE
persistence.
+ *
+ * <p>Stores the Debezium document as a nested JSON object (not a string)
to avoid redundant
+ * escaping. Format: {@code
+ * [{"i":"\"schema\".\"table\"","uc":false,"c":{...debeziumDoc...}},...]}.
+ */
+ @Override
+ public String serializeTableSchemas() {
+ if (tableSchemas == null || tableSchemas.isEmpty()) {
+ return null;
+ }
+ try {
+ DocumentWriter docWriter = DocumentWriter.defaultWriter();
+ ArrayNode result = OBJECT_MAPPER.createArrayNode();
+ for (Map.Entry<TableId, TableChanges.TableChange> e :
tableSchemas.entrySet()) {
+ TableId tableId = e.getKey();
+ // useCatalogBeforeSchema: false when catalog is null but
schema is set (e.g. PG)
+ boolean uc =
SerializerUtils.shouldUseCatalogBeforeSchema(tableId);
+ ObjectNode entry = OBJECT_MAPPER.createObjectNode();
+ entry.put("i", tableId.toDoubleQuotedString());
+ entry.put("uc", uc);
+ // parse compact doc JSON into a JsonNode so "c" is a nested
object, not a string
+ entry.set(
+ "c",
+ OBJECT_MAPPER.readTree(
+
docWriter.write(TABLE_CHANGE_SERIALIZER.toDocument(e.getValue()))));
+ result.add(entry);
+ }
+ return OBJECT_MAPPER.writeValueAsString(result);
+ } catch (Exception e) {
Review Comment:
**[Low]** `serializeTableSchemas()` catches `Exception` and returns `null`
with a WARN log. This means if serialization fails (e.g., due to an unexpected
Debezium document format), the tableSchemas won't be persisted to FE in this
commit cycle.
On the next batch, `feHadNoSchema` will be true (since FE has no schema),
and `serializeTableSchemas()` will be called again. If the same serialization
error persists, schemas will never be persisted — but the in-memory schema will
still be correct for the current CDC client session.
This is acceptable for robustness (don't fail the whole batch for a
serialization issue), but consider logging at ERROR level since losing schema
persistence could cause schema re-detection on restart.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeManager.java:
##########
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Static utility class for executing DDL schema changes on the Doris FE via
HTTP. */
+public class SchemaChangeManager {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SchemaChangeManager.class);
+ private static final String SCHEMA_CHANGE_API =
"http://%s/api/query/default_cluster/%s";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final String COLUMN_EXISTS_MSG = "Can not add column which
already exists";
+ private static final String COLUMN_NOT_EXISTS_MSG = "Column does not
exists";
+
+ private SchemaChangeManager() {}
+
+ /**
+ * Execute a list of DDL statements on FE. Each statement is sent
independently.
+ *
+ * <p>Idempotent errors (ADD COLUMN when column already exists, DROP
COLUMN when column does not
+ * exist) are logged as warnings and silently skipped, so retries on a
different BE after a
+ * failed commitOffset do not cause infinite failures.
+ *
+ * @param feAddr Doris FE address (host:port)
+ * @param db target database
+ * @param token FE auth token
+ * @param sqls DDL statements to execute
+ */
+ public static void executeDdls(String feAddr, String db, String token,
List<String> sqls)
+ throws IOException {
+ if (sqls == null || sqls.isEmpty()) {
+ LOG.info("No DDL statements to execute");
+ return;
+ }
+ for (String stmt : sqls) {
+ stmt = stmt.trim();
+ if (stmt.isEmpty()) {
+ continue;
+ }
+ LOG.info("Executing DDL on FE {}: {}", feAddr, stmt);
+ execute(feAddr, db, token, stmt);
+ }
+ }
+
+ /**
+ * Execute a single SQL statement via the FE query API.
+ *
+ * <p>Idempotent errors are swallowed with a warning; all other errors
throw {@link
+ * IOException}.
+ */
+ public static void execute(String feAddr, String db, String token, String
sql)
+ throws IOException {
+ HttpPost post = buildHttpPost(feAddr, db, token, sql);
+ String responseBody = handleResponse(post);
+ LOG.info("Executed DDL {} with response: {}", sql, responseBody);
+ parseResponse(sql, responseBody);
+ }
+
+ // ─── Internal helpers
─────────────────────────────────────────────────────
+
+ private static HttpPost buildHttpPost(String feAddr, String db, String
token, String sql)
+ throws IOException {
+ String url = String.format(SCHEMA_CHANGE_API, feAddr, db);
+ Map<String, Object> bodyMap = new HashMap<>();
+ bodyMap.put("stmt", sql);
+ String body = OBJECT_MAPPER.writeValueAsString(bodyMap);
+
+ HttpPost post = new HttpPost(url);
+ post.setHeader("Content-Type", "application/json;charset=UTF-8");
+ post.setHeader("Authorization", HttpUtil.getAuthHeader());
+ post.setHeader("token", token);
+ post.setEntity(new StringEntity(body, "UTF-8"));
+ return post;
+ }
+
+ private static String handleResponse(HttpPost request) throws IOException {
+ try (CloseableHttpClient client = HttpUtil.getHttpClient();
Review Comment:
**[Low]** `handleResponse` creates a new `CloseableHttpClient` for each DDL
statement via `HttpUtil.getHttpClient()`. Since DDLs are rare events, this is
fine. But if multiple DDLs are generated in a single detection event (e.g.,
double ADD), each DDL gets its own HTTP client + connection. Consider passing
the client from the caller loop in `executeDdls()` to reuse the connection.
Not a functional issue — just a minor efficiency suggestion for the uncommon
multi-DDL case.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -309,15 +325,22 @@ public void writeRecords(WriteRecordRequest
writeRecordRequest) throws Exception
}
// Process data messages
- List<String> serializedRecords =
-
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
-
- if (!CollectionUtils.isEmpty(serializedRecords)) {
- String database = writeRecordRequest.getTargetDb();
+ DeserializeResult result =
+ sourceReader.deserialize(deserializeContext,
element);
+
+ if (result.getType() ==
DeserializeResult.Type.SCHEMA_CHANGE) {
+ // Flush pending data before DDL
+ batchStreamLoad.forceFlush();
+ SchemaChangeManager.executeDdls(feAddr, targetDb,
token, result.getDdls());
+ hasExecuteDDL = true;
+
sourceReader.applySchemaChange(result.getUpdatedSchemas());
+ lastMessageIsHeartbeat = false;
+ }
+ if (!CollectionUtils.isEmpty(result.getRecords())) {
Review Comment:
**[Info — Good Design]** The flow here correctly handles the case where a
SCHEMA_CHANGE result also carries DML records:
1. `forceFlush()` — flush any pending data from previous records
2. `executeDdls()` — apply DDLs to Doris
3. `applySchemaChange()` — update in-memory cached schema
4. Fall through to `if (!CollectionUtils.isEmpty(result.getRecords()))` —
write the triggering DML record
This ordering ensures the DDL is applied before the DML that contains the
new columns, preventing data loss. The `if` (not `else if`) on line 339 is
intentional and correct.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]