Copilot commented on code in PR #61267:
URL: https://github.com/apache/doris/pull/61267#discussion_r2923685139


##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java:
##########
@@ -43,11 +43,33 @@ public class DataSourceConfigValidator {
             DataSourceConfigKeys.SSL_ROOTCERT
     );
 
+    // Known suffixes for per-table config keys (format: 
"table.<tableName>.<suffix>")
+    private static final Set<String> ALLOW_TABLE_LEVEL_SUFFIXES = 
Sets.newHashSet(
+            DataSourceConfigKeys.TABLE_EXCLUDE_COLUMNS_SUFFIX
+    );
+
+    private static final String TABLE_LEVEL_PREFIX = 
DataSourceConfigKeys.TABLE + ".";
+
     public static void validateSource(Map<String, String> input) throws 
IllegalArgumentException {
         for (Map.Entry<String, String> entry : input.entrySet()) {
             String key = entry.getKey();
             String value = entry.getValue();
 
+            if (key.startsWith(TABLE_LEVEL_PREFIX)) {
+                // per-table config key must be exactly: 
table.<tableName>.<suffix>
+                // reject malformed keys like "table.exclude_columns" (missing 
tableName)
+                String[] parts = key.split("\\.", -1);
+                if (parts.length < 3 || parts[1].isEmpty()) {
+                    throw new IllegalArgumentException("Malformed per-table 
config key: '" + key
+                            + "'. Expected format: 
table.<tableName>.<suffix>");
+                }
+                String suffix = parts[parts.length - 1];
+                if (!ALLOW_TABLE_LEVEL_SUFFIXES.contains(suffix)) {
+                    throw new IllegalArgumentException("Unknown per-table 
config key: '" + key + "'");
+                }

Review Comment:
   Per-table key validation claims the format must be exactly 
`table.<tableName>.<suffix>`, but the current split check allows extra dot 
segments (e.g. `table.my_table.extra.exclude_columns`) and even empty segments 
like `table.t..exclude_columns`. These malformed keys will pass validation yet 
won’t be parsed as intended (or will produce a mismatched tableName), leading 
to silent misconfiguration. Consider enforcing `parts.length == 3` (and 
`parts[1]` non-empty) for table-level keys and validating the value is 
non-blank.
   ```suggestion
                   // and keys with extra or empty segments like 
"table.t..exclude_columns"
                   String[] parts = key.split("\\.", -1);
                   if (parts.length != 3 || parts[1].isEmpty()) {
                       throw new IllegalArgumentException("Malformed per-table 
config key: '" + key
                               + "'. Expected format: 
table.<tableName>.<suffix>");
                   }
                   String suffix = parts[2];
                   if (!ALLOW_TABLE_LEVEL_SUFFIXES.contains(suffix)) {
                       throw new IllegalArgumentException("Unknown per-table 
config key: '" + key + "'");
                   }
                   if (!isValidValue(key, value)) {
                       throw new IllegalArgumentException("Invalid value for 
key '" + key + "': " + value);
                   }
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java:
##########
@@ -79,49 +82,60 @@ public class DebeziumJsonDeserializer
     private static ObjectMapper objectMapper = new ObjectMapper();
     @Setter private ZoneId serverTimeZone = ZoneId.systemDefault();
     @Getter @Setter protected Map<TableId, TableChanges.TableChange> 
tableSchemas;
+    // Parsed exclude-column sets per table, populated once in init() from 
config
+    private Map<String, Set<String>> excludeColumnsCache = new HashMap<>();
 
     public DebeziumJsonDeserializer() {}
 
     @Override
     public void init(Map<String, String> props) {
         this.serverTimeZone =
                 
ConfigUtil.getServerTimeZoneFromJdbcUrl(props.get(DataSourceConfigKeys.JDBC_URL));
+        excludeColumnsCache = ConfigUtil.parseAllExcludeColumns(props);
     }
 
     @Override
     public DeserializeResult deserialize(Map<String, String> context, 
SourceRecord record)
             throws IOException {
         if (RecordUtils.isDataChangeRecord(record)) {
             LOG.trace("Process data change record: {}", record);
-            List<String> rows = deserializeDataChangeRecord(record);
+            List<String> rows = deserializeDataChangeRecord(record, context);
             return DeserializeResult.dml(rows);
         } else {
             return DeserializeResult.empty();
         }
     }
 
-    private List<String> deserializeDataChangeRecord(SourceRecord record) 
throws IOException {
+    private List<String> deserializeDataChangeRecord(
+            SourceRecord record, Map<String, String> context) throws 
IOException {
         List<String> rows = new ArrayList<>();
+        String tableName = extractTableName(record);
+        Set<String> excludeColumns = 
excludeColumnsCache.getOrDefault(tableName, Collections.emptySet());

Review Comment:
   `deserializeDataChangeRecord(SourceRecord record, Map<String, String> 
context)` no longer uses the `context` parameter (exclude columns come from 
`excludeColumnsCache`, and table name is extracted from the record). Consider 
dropping the unused parameter (and the corresponding argument at the call site) 
to avoid confusion about whether runtime context can affect DML filtering.



##########
fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java:
##########
@@ -319,6 +321,16 @@ public static List<CreateTableCommand> 
generateCreateTableCmds(String targetDb,
             if (primaryKeys.isEmpty()) {
                 noPrimaryKeyTables.add(table);
             }
+
+            // Validate and apply exclude_columns for this table
+            Set<String> excludeColumns = parseExcludeColumns(properties, 
table);
+            if (!excludeColumns.isEmpty()) {
+                validateExcludeColumns(excludeColumns, table, columns, 
primaryKeys);
+                columns = columns.stream()
+                        .filter(col -> !excludeColumns.contains(col.getName()))
+                        .collect(Collectors.toList());
+            }

Review Comment:
   `exclude_columns` is applied only after calling `getColumns(...)`, but 
`getColumns(...)` currently validates every upstream column type (and can throw 
on unsupported types) before you filter excluded columns. This makes it 
impossible to use `exclude_columns` to skip problematic/unsupported upstream 
columns, which is a common reason to exclude columns. Consider parsing 
`exclude_columns` before `getColumns(...)` and filtering the raw JDBC column 
list before running per-column validations / type adjustments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to