github-actions[bot] commented on code in PR #61182:
URL: https://github.com/apache/doris/pull/61182#discussion_r2910789232


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java:
##########
@@ -0,0 +1,288 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.doris.cdcclient.common.DorisType;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.debezium.relational.Column;
+import io.debezium.relational.history.TableChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utility class for generating Doris ALTER TABLE SQL from schema diffs. */
+public class SchemaChangeHelper {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SchemaChangeHelper.class);
+    private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
+    private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s";
+
+    private SchemaChangeHelper() {}
+
+    // ─── Schema diff result 
────────────────────────────────────────────────────
+
+    /**
+     * Holds the result of a full schema comparison between an after-schema 
and stored TableChange.
+     */
+    public static class SchemaDiff {
+        /** Fields present in afterSchema but absent from stored. */
+        public final List<Field> added;
+
+        /** Column names present in stored but absent from afterSchema. */
+        public final List<String> dropped;
+
+        /** Same-named columns whose Doris type or default value differs. */
+        public final Map<String, Field> modified;
+
+        public SchemaDiff(List<Field> added, List<String> dropped, Map<String, 
Field> modified) {
+            this.added = added;
+            this.dropped = dropped;
+            this.modified = modified;
+        }
+
+        public boolean isEmpty() {
+            return added.isEmpty() && dropped.isEmpty() && modified.isEmpty();
+        }
+    }
+
+    // ─── Schema-diff helpers (Kafka Connect schema ↔ stored TableChange) 
──────
+
+    /**
+     * Name-only schema diff: compare field names in {@code afterSchema} 
against the stored {@link
+     * TableChanges.TableChange}, detecting added and dropped columns by name 
only.
+     *
+     * <p>Only support add and drop and not support modify and rename
+     *
+     * <p>When {@code stored} is null or empty, both lists are empty (no 
baseline to diff against).
+     */
+    public static SchemaDiff diffSchemaByName(Schema afterSchema, 
TableChanges.TableChange stored) {
+        List<Field> added = new ArrayList<>();
+        List<String> dropped = new ArrayList<>();
+
+        if (afterSchema == null || stored == null || stored.getTable() == 
null) {
+            return new SchemaDiff(added, dropped, new LinkedHashMap<>());
+        }
+
+        // Detect added: fields present in afterSchema but absent from stored
+        for (Field field : afterSchema.fields()) {
+            if (stored.getTable().columnWithName(field.name()) == null) {
+                added.add(field);
+            }
+        }
+
+        // Detect dropped: columns present in stored but absent from 
afterSchema
+        for (Column col : stored.getTable().columns()) {
+            if (afterSchema.field(col.name()) == null) {
+                dropped.add(col.name());
+            }
+        }
+
+        return new SchemaDiff(added, dropped, new LinkedHashMap<>());
+    }
+
+    // ─── Quoting helpers 
──────────────────────────────────────────────────────
+
+    /** Wrap a name in backticks if not already quoted. */
+    public static String identifier(String name) {
+        if (name.startsWith("`") && name.endsWith("`")) {
+            return name;
+        }
+        return "`" + name + "`";
+    }
+
+    /** Return a fully-qualified {@code `db`.`table`} identifier string. */
+    public static String quoteTableIdentifier(String db, String table) {
+        return identifier(db) + "." + identifier(table);
+    }
+
+    /**
+     * Format a default value (already a plain Java string, not a raw SQL 
expression) into a form
+     * suitable for a Doris {@code DEFAULT} clause.
+     *
+     * <p>The caller is expected to pass a <em>deserialized</em> value — e.g. 
obtained from the
+     * Kafka Connect schema via {@code 
field.schema().defaultValue().toString()} — rather than a raw
+     * PG SQL expression. This avoids having to strip PG-specific type casts 
({@code ::text}, etc.).
+     *
+     * <ul>
+     *   <li>SQL keywords ({@code NULL}, {@code CURRENT_TIMESTAMP}, {@code 
TRUE}, {@code FALSE}) are
+     *       returned as-is.
+     *   <li>Numeric literals are returned as-is (no quotes).
+     *   <li>Everything else is wrapped in single quotes.
+     * </ul>
+     */
+    public static String quoteDefaultValue(String defaultValue) {
+        if (defaultValue == null) {
+            return null;
+        }
+        if (defaultValue.equalsIgnoreCase("current_timestamp")
+                || defaultValue.equalsIgnoreCase("null")
+                || defaultValue.equalsIgnoreCase("true")
+                || defaultValue.equalsIgnoreCase("false")) {
+            return defaultValue;
+        }
+        try {
+            Double.parseDouble(defaultValue);
+            return defaultValue;
+        } catch (NumberFormatException ignored) {
+            // fall through
+        }
+        return "'" + defaultValue + "'";
+    }

Review Comment:
   **Bug: SQL injection / malformed SQL** — If `defaultValue` contains a single 
quote (e.g., a PG default like `'it''s a value'` or a function call with 
embedded quotes), this produces malformed SQL.
   
   Suggested fix — escape single quotes before wrapping:
   ```java
   return "'" + defaultValue.replace("'", "\\'") + "'";
   ```
   
   Also consider adding a unit test for `quoteDefaultValue` with edge cases 
(embedded quotes, empty string, numeric strings, SQL keywords).



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java:
##########
@@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.deserialize;
+
+import org.apache.doris.cdcclient.common.Constants;
+import org.apache.doris.cdcclient.utils.SchemaChangeHelper;
+
+import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
+import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
+
+import io.debezium.data.Envelope;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableEditor;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * PostgreSQL-specific deserializer that detects schema changes (ADD/DROP 
column only) by comparing
+ * the record's Kafka Connect schema field names with stored tableSchemas.
+ *
+ * <p>Because PostgreSQL does not emit DDL events in the WAL stream, schema 
detection is done by
+ * comparing the "after" struct field names in each DML record against the 
known column set.
+ *
+ * <p>Type comparison is intentionally skipped to avoid false positives caused 
by Kafka Connect type
+ * ambiguity (e.g. text/varchar/json/uuid all appear as STRING). When a column 
add or drop is
+ * detected, the accurate column types are fetched directly from PostgreSQL 
via the injected {@link
+ * #pgSchemaRefresher} callback.
+ *
+ * <p>MODIFY column type is not supported — users must manually execute ALTER 
TABLE ... MODIFY
+ * COLUMN in Doris when a PG column type changes.
+ */
+public class PostgresDebeziumJsonDeserializer extends DebeziumJsonDeserializer 
{
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG =
+            LoggerFactory.getLogger(PostgresDebeziumJsonDeserializer.class);
+
+    /**
+     * Callback to fetch the current PG table schema for a single table via 
JDBC. Injected by {@link
+     * org.apache.doris.cdcclient.source.reader.postgres.PostgresSourceReader} 
after initialization.
+     */
+    private transient Function<TableId, TableChanges.TableChange> 
pgSchemaRefresher;
+
+    public void setPgSchemaRefresher(Function<TableId, 
TableChanges.TableChange> refresher) {
+        this.pgSchemaRefresher = refresher;
+    }
+
+    @Override
+    public DeserializeResult deserialize(Map<String, String> context, 
SourceRecord record)
+            throws IOException {
+        if (!RecordUtils.isDataChangeRecord(record)) {
+            return DeserializeResult.empty();
+        }
+
+        Schema valueSchema = record.valueSchema();
+        if (valueSchema == null) {
+            return super.deserialize(context, record);
+        }
+
+        Field afterField = valueSchema.field(Envelope.FieldName.AFTER);
+        if (afterField == null) {
+            return super.deserialize(context, record);
+        }
+
+        Schema afterSchema = afterField.schema();
+        TableId tableId = extractTableId(record);
+        TableChanges.TableChange stored = tableSchemas != null ? 
tableSchemas.get(tableId) : null;
+
+        // No baseline schema available — cannot detect changes, fall through 
to normal
+        // deserialization
+        if (stored == null || stored.getTable() == null) {
+            LOG.debug(
+                    "No stored schema for table {}, skipping schema change 
detection.",
+                    tableId.identifier());
+            return super.deserialize(context, record);
+        }
+
+        // First pass: name-only diff — fast, in-memory, no type comparison, 
no false positives
+        SchemaChangeHelper.SchemaDiff nameDiff =
+                SchemaChangeHelper.diffSchemaByName(afterSchema, stored);
+        if (nameDiff.isEmpty()) {
+            return super.deserialize(context, record);
+        }
+
+        Preconditions.checkNotNull(
+                pgSchemaRefresher,
+                "pgSchemaRefresher callback is not set. Cannot fetch fresh PG 
schema for change detection.");
+
+        // the last fresh schema
+        TableChanges.TableChange fresh = pgSchemaRefresher.apply(tableId);
+
+        // Second diff: use afterSchema as the source of truth for which 
columns the current WAL
+        // record is aware of. Only process additions/drops visible in 
afterSchema — columns that
+        // exist in fresh (JDBC) but are absent from afterSchema belong to a 
later DDL that has not
+        // yet produced a DML record, and will be processed when that DML 
record arrives.
+        //
+        // pgAdded: present in afterSchema but absent from stored → look up 
Column in fresh for
+        //          accurate PG type metadata. If fresh doesn't have the 
column yet (shouldn't
+        //          happen normally), skip it.
+        // pgDropped: present in stored but absent from afterSchema.
+        List<Column> pgAdded = new ArrayList<>();
+        List<String> pgDropped = new ArrayList<>();
+
+        for (Field field : afterSchema.fields()) {
+            if (stored.getTable().columnWithName(field.name()) == null) {
+                Column freshCol = 
fresh.getTable().columnWithName(field.name());

Review Comment:
   **Bug: Potential NPE** — `refreshSingleTableSchema()` returns 
`schemas.get(tableId)` from `PostgresSourceReader`, which can return `null` if 
the table was dropped between the initial detection and the JDBC refresh. Here 
`fresh` could be `null`, and calling `fresh.getTable()` would throw an NPE.
   
   Suggested fix:
   ```java
   if (fresh == null || fresh.getTable() == null) {
       LOG.warn("Table {} no longer exists in source after JDBC refresh, 
skipping schema change", tableIdentifier);
       return DeserializeResult.empty();
   }
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java:
##########
@@ -0,0 +1,288 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.doris.cdcclient.common.DorisType;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.debezium.relational.Column;
+import io.debezium.relational.history.TableChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utility class for generating Doris ALTER TABLE SQL from schema diffs. */
+public class SchemaChangeHelper {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SchemaChangeHelper.class);
+    private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
+    private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s";
+
+    private SchemaChangeHelper() {}
+
+    // ─── Schema diff result 
────────────────────────────────────────────────────
+
+    /**
+     * Holds the result of a full schema comparison between an after-schema 
and stored TableChange.
+     */
+    public static class SchemaDiff {
+        /** Fields present in afterSchema but absent from stored. */
+        public final List<Field> added;
+
+        /** Column names present in stored but absent from afterSchema. */
+        public final List<String> dropped;
+
+        /** Same-named columns whose Doris type or default value differs. */
+        public final Map<String, Field> modified;
+
+        public SchemaDiff(List<Field> added, List<String> dropped, Map<String, 
Field> modified) {
+            this.added = added;
+            this.dropped = dropped;
+            this.modified = modified;
+        }
+
+        public boolean isEmpty() {
+            return added.isEmpty() && dropped.isEmpty() && modified.isEmpty();
+        }
+    }
+
+    // ─── Schema-diff helpers (Kafka Connect schema ↔ stored TableChange) 
──────
+
+    /**
+     * Name-only schema diff: compare field names in {@code afterSchema} 
against the stored {@link
+     * TableChanges.TableChange}, detecting added and dropped columns by name 
only.
+     *
+     * <p>Only support add and drop and not support modify and rename
+     *
+     * <p>When {@code stored} is null or empty, both lists are empty (no 
baseline to diff against).
+     */
+    public static SchemaDiff diffSchemaByName(Schema afterSchema, 
TableChanges.TableChange stored) {
+        List<Field> added = new ArrayList<>();
+        List<String> dropped = new ArrayList<>();
+
+        if (afterSchema == null || stored == null || stored.getTable() == 
null) {
+            return new SchemaDiff(added, dropped, new LinkedHashMap<>());
+        }
+
+        // Detect added: fields present in afterSchema but absent from stored
+        for (Field field : afterSchema.fields()) {
+            if (stored.getTable().columnWithName(field.name()) == null) {
+                added.add(field);
+            }
+        }
+
+        // Detect dropped: columns present in stored but absent from 
afterSchema
+        for (Column col : stored.getTable().columns()) {
+            if (afterSchema.field(col.name()) == null) {
+                dropped.add(col.name());
+            }
+        }
+
+        return new SchemaDiff(added, dropped, new LinkedHashMap<>());
+    }
+
+    // ─── Quoting helpers 
──────────────────────────────────────────────────────
+
+    /** Wrap a name in backticks if not already quoted. */
+    public static String identifier(String name) {
+        if (name.startsWith("`") && name.endsWith("`")) {
+            return name;
+        }
+        return "`" + name + "`";
+    }

Review Comment:
   **Bug: Malformed SQL with backtick in identifier** — If a column or table 
name contains a backtick (rare but legal in PG), this produces malformed Doris 
SQL: `` `a`b` ``.
   
   Suggested fix — escape internal backticks:
   ```java
   return "`" + name.replace("`", "``") + "`";
   ```
   This follows the standard SQL identifier escaping convention (doubling the 
quote character).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to