Copilot commented on code in PR #4310:
URL: https://github.com/apache/flink-cdc/pull/4310#discussion_r2910557590
##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java:
##########
@@ -92,6 +92,17 @@ class PostTransformOperatorTest {
.physicalColumn("__table_name__",
DataTypes.STRING().notNull())
.primaryKey("col1")
.build();
+ private static final Schema METADATA_SCHEMA_UPPER_CASE =
+ Schema.newBuilder()
+ .physicalColumn("col1".toUpperCase(),
DataTypes.STRING().notNull())
+ .primaryKey("col1".toUpperCase())
+ .build();
+ private static final Schema METADATA_SCHEMA_DUPLICATE_COLUMN_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING().notNull())
+ .physicalColumn("col1".toUpperCase(),
DataTypes.STRING().notNull())
+ .primaryKey("col1".toUpperCase())
+ .build();
Review Comment:
`METADATA_SCHEMA_DUPLICATE_COLUMN_SCHEMA` is redundant/awkward
(“DUPLICATE_COLUMN_SCHEMA”). Consider renaming to something clearer like
`METADATA_SCHEMA_WITH_DUPLICATE_COLUMNS` (or similar) to better convey intent.
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java:
##########
@@ -194,64 +224,65 @@ private void processElementInternal(StreamRecord<Event>
element) {
// -------------------
/**
- * Apply effective transform rules to {@link CreateTableEvent}s based on
effective transformers.
+ * Apply effective transform rule to {@link CreateTableEvent}s based on
effective transformer.
*/
private Optional<Event> processCreateTableEvent(
- CreateTableEvent event, List<PostTransformer>
effectiveTransformers) {
+ CreateTableEvent event, PostTransformer effectiveTransformer) {
TableId tableId = event.tableId();
Schema preSchema = event.getSchema();
- // Apply transform rules and verify we can get a deterministic post
schema
- List<Schema> schemas =
- effectiveTransformers.stream()
- .map(trans -> transformSchema(preSchema, trans))
- .collect(Collectors.toList());
-
Schema postSchema =
-
SchemaUtils.ensurePkNonNull(SchemaMergingUtils.strictlyMergeSchemas(schemas));
+ SchemaUtils.ensurePkNonNull(transformSchema(preSchema,
effectiveTransformer));
// Update transform info map
postTransformInfoMap.put(
tableId, PostTransformChangeInfo.of(tableId, preSchema,
postSchema));
// Update "if-table-has-been–wildcard–matched" map
boolean wildcardMatched =
- effectiveTransformers.stream()
- .map(PostTransformer::getProjection)
- .flatMap(this::optionalToStream)
- .map(TransformProjection::getProjection)
- .anyMatch(TransformParser::hasAsterisk);
+ effectiveTransformer.getProjection().isPresent()
+ && TransformParser.hasAsterisk(
+
effectiveTransformer.getProjection().get().getProjection());
hasAsteriskMap.put(tableId, wildcardMatched);
projectedColumnsMap.put(
tableId,
preSchema.getColumnNames().stream()
.filter(postSchema.getColumnNames()::contains)
.collect(Collectors.toList()));
- return Optional.of(new CreateTableEvent(tableId, postSchema));
+ // Apply all effective post-converters
+ Optional<SchemaChangeEvent> createTableEvent =
+ Optional.of(new CreateTableEvent(tableId, postSchema));
+ return createTableEvent.map(Event.class::cast);
Review Comment:
The comment says “Apply all effective post-converters”, but no converter
logic is applied here (it only wraps `CreateTableEvent` in an `Optional`).
Either implement the intended converter application for `CreateTableEvent` (if
applicable) or remove/adjust the comment and simplify the return to avoid
misleading future readers.
```suggestion
// Create transformed CreateTableEvent based on post-transformed
schema
return Optional.of((Event) new CreateTableEvent(tableId,
postSchema));
```
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java:
##########
@@ -194,64 +224,65 @@ private void processElementInternal(StreamRecord<Event>
element) {
// -------------------
/**
- * Apply effective transform rules to {@link CreateTableEvent}s based on
effective transformers.
+ * Apply effective transform rule to {@link CreateTableEvent}s based on
effective transformer.
*/
private Optional<Event> processCreateTableEvent(
- CreateTableEvent event, List<PostTransformer>
effectiveTransformers) {
+ CreateTableEvent event, PostTransformer effectiveTransformer) {
TableId tableId = event.tableId();
Schema preSchema = event.getSchema();
- // Apply transform rules and verify we can get a deterministic post
schema
- List<Schema> schemas =
- effectiveTransformers.stream()
- .map(trans -> transformSchema(preSchema, trans))
- .collect(Collectors.toList());
-
Schema postSchema =
-
SchemaUtils.ensurePkNonNull(SchemaMergingUtils.strictlyMergeSchemas(schemas));
+ SchemaUtils.ensurePkNonNull(transformSchema(preSchema,
effectiveTransformer));
// Update transform info map
postTransformInfoMap.put(
tableId, PostTransformChangeInfo.of(tableId, preSchema,
postSchema));
// Update "if-table-has-been–wildcard–matched" map
boolean wildcardMatched =
- effectiveTransformers.stream()
- .map(PostTransformer::getProjection)
- .flatMap(this::optionalToStream)
- .map(TransformProjection::getProjection)
- .anyMatch(TransformParser::hasAsterisk);
+ effectiveTransformer.getProjection().isPresent()
+ && TransformParser.hasAsterisk(
+
effectiveTransformer.getProjection().get().getProjection());
hasAsteriskMap.put(tableId, wildcardMatched);
projectedColumnsMap.put(
tableId,
preSchema.getColumnNames().stream()
.filter(postSchema.getColumnNames()::contains)
.collect(Collectors.toList()));
- return Optional.of(new CreateTableEvent(tableId, postSchema));
+ // Apply all effective post-converters
+ Optional<SchemaChangeEvent> createTableEvent =
+ Optional.of(new CreateTableEvent(tableId, postSchema));
+ return createTableEvent.map(Event.class::cast);
}
/**
* Apply effective transform rules to other {@link SchemaChangeEvent}s
based on effective
* transformers and existing {@link PostTransformChangeInfo}.
*/
private Optional<Event> processSchemaChangeEvent(
- SchemaChangeEvent event, List<PostTransformer>
effectiveTransformers) {
+ SchemaChangeEvent event, PostTransformer transformer) {
+ // CreateTableEvents should be handled in `processCreateTableEvent`
method
+ Preconditions.checkArgument(
+ !(event instanceof CreateTableEvent),
+ "Unexpected CreateTableEvents in processSchemaChangeEvent
method: %s",
+ event);
TableId tableId = event.tableId();
+ if (!postTransformInfoMap.containsKey(tableId)) {
+ LOG.warn(
+ "Met dangling schema change event {}, Table {} might have
been dropped.",
Review Comment:
The log message is grammatically awkward (“Met dangling ...”). Consider
rephrasing to “Encountered a dangling schema change event ..., table ... might
have been dropped.” to improve clarity.
```suggestion
"Encountered a dangling schema change event {}, table {}
might have been dropped.",
```
##########
docs/content.zh/docs/core-concept/transform.md:
##########
@@ -346,14 +346,14 @@ transform:
transform:
- source-table: mydb.web_order
projection: id, order_id
- filter: UPPER(province) = 'SHANGHAI'
description: classification mapping example
- - source-table: mydb.web_order
+ - source-table: mydb.\.*
projection: order_id as id, id as order_id
- filter: UPPER(province) = 'BEIJING'
description: classification mapping example
```
+这里 `mydb.web_order` 将被第一条规则处理,而 `mydb` 里的所有其他表将被第二条规则处理。
Review Comment:
与英文文档同样的问题:示例仍标注为“classification mapping example”,但当前规则已不再体现“分类映射”(之前由不同
filter 驱动)。建议更新描述来强调“按 source-table 首条匹配规则生效”,或改用单条规则 + `CASE WHEN`
来展示分类映射的推荐写法。
##########
docs/content/docs/core-concept/transform.md:
##########
@@ -349,14 +349,14 @@ For example, we may define a transform rule as follows:
transform:
- source-table: mydb.web_order
projection: id, order_id
- filter: UPPER(province) = 'SHANGHAI'
description: classification mapping example
- - source-table: mydb.web_order
+ - source-table: mydb.\.*
projection: order_id as id, id as order_id
- filter: UPPER(province) = 'BEIJING'
description: classification mapping example
```
+Here `mydb.web_order` will be handled in the first rule, while other tables in
`mydb` will fall into the second one.
Review Comment:
This example is labeled “classification mapping example” but the
classification behavior (previously driven by different filters) is no longer
present—both rules are now unconditional and only differ by
selector/projection. Consider either updating the descriptions to reflect
“first-match routing by table pattern”, or rewriting the example to demonstrate
classification via a single rule using `CASE WHEN` (which matches the new
recommended approach used in tests).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]