Copilot commented on code in PR #4319:
URL: https://github.com/apache/flink-cdc/pull/4319#discussion_r2943908565
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java:
##########
@@ -289,40 +289,71 @@ private Optional<Event> processDataChangeEvent(
getProjectionProcessor(tableId, effectiveTransformer);
TransformFilterProcessor filterProcessor =
getFilterProcessor(tableId, effectiveTransformer);
- RecordData beforeRow = null;
- RecordData afterRow = null;
- boolean filterPassed = true;
+
+ BinaryRecordData beforeRow = null;
+ BinaryRecordData afterRow = null;
+ boolean beforeFilterPassed = false;
+ boolean afterFilterPassed = false;
+
if (event.before() != null) {
context.opType = beforeOp;
Tuple2<BinaryRecordData, Boolean> result =
transformRecord(
event.before(), info, projectionProcessor,
filterProcessor, context);
beforeRow = result.f0;
- filterPassed = result.f1;
+ beforeFilterPassed = result.f1;
}
if (event.after() != null) {
context.opType = afterOp;
Tuple2<BinaryRecordData, Boolean> result =
transformRecord(
event.after(), info, projectionProcessor,
filterProcessor, context);
afterRow = result.f0;
- filterPassed = result.f1;
+ afterFilterPassed = result.f1;
}
- if (filterPassed) {
- DataChangeEvent finalEvent = DataChangeEvent.projectRecords(event,
beforeRow, afterRow);
- if (effectiveTransformer.getPostTransformConverter().isPresent()) {
- return effectiveTransformer
- .getPostTransformConverter()
- .get()
- .convert(finalEvent)
- .map(Event.class::cast);
- } else {
- return Optional.of(finalEvent);
- }
+ // For UPDATE events, before and after filter results may differ,
requiring op type
+ // conversion:
+ // before=Y, after=Y -> UPDATE; before=Y, after=N -> DELETE;
+ // before=N, after=Y -> INSERT; before=N, after=N -> drop.
+ DataChangeEvent finalEvent;
+ switch (event.op()) {
+ case INSERT:
+ case REPLACE:
+ if (!afterFilterPassed) {
+ return Optional.empty();
+ }
+ finalEvent = DataChangeEvent.projectRecords(event, beforeRow,
afterRow);
+ break;
+ case DELETE:
+ if (!beforeFilterPassed) {
+ return Optional.empty();
+ }
+ finalEvent = DataChangeEvent.projectRecords(event, beforeRow,
afterRow);
+ break;
+ case UPDATE:
+ if (beforeFilterPassed && afterFilterPassed) {
+ finalEvent = DataChangeEvent.projectRecords(event,
beforeRow, afterRow);
+ } else if (beforeFilterPassed) {
+ finalEvent = DataChangeEvent.deleteEvent(tableId,
beforeRow, event.meta());
+ } else if (afterFilterPassed) {
Review Comment:
When an UPDATE is converted into a synthetic INSERT/DELETE (based on
before/after filter results), the projected row was computed with
`context.opType` set to `-U/+U` (from `event.opTypeString(...)`). Any
projection/UDF that references the built-in metadata column
`__data_event_type__` will therefore emit the wrong value (e.g. `+U` instead of
`+I`, `-U` instead of `-D`) for these converted events. Consider recomputing
the projected row for the emitted event with `context.opType` set to the final
operation type (without re-evaluating the filter), or otherwise ensuring
`__data_event_type__` matches the converted op.
##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java:
##########
@@ -784,6 +784,91 @@ void testDataChangeEventTransformWithDuplicateColumns()
throws Exception {
transformFunctionEventEventOperatorTestHarness.close();
}
+ @Test
+ void testUpdateEventFilterOpTypeConversion() throws Exception {
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
+ .addTransform(
+ COLUMN_SQUARE_TABLE.identifier(),
+ "col1, col2, col2 * col2 as square_col2",
+ "col2 < 3 OR col2 > 5")
+ .build();
Review Comment:
This new test validates UPDATE op-type conversion, but it doesn’t cover
correctness of op-type dependent projections (e.g. expressions using the
built-in metadata column `__data_event_type__`). Adding that column to the
projection (or a dedicated assertion) would prevent regressions where converted
INSERT/DELETE events still carry `+U/-U` in projected results.
##########
docs/content.zh/docs/core-concept/transform.md:
##########
@@ -337,23 +337,6 @@ transform:
```
小技巧:table-options 的格式是 `key1=value1,key2=value2`。
-## 分类映射
-多个转换规则可以定义为分类映射。
-只有第一个匹配的转换规则将应用。
-举个例子,我们可以定义一个转换规则如下:
-
-```yaml
-transform:
- - source-table: mydb.web_order
- projection: id, order_id
- filter: UPPER(province) = 'SHANGHAI'
- description: classification mapping example
- - source-table: mydb.web_order
- projection: order_id as id, id as order_id
- filter: UPPER(province) = 'BEIJING'
- description: classification mapping example
-```
-
## 用户自定义函数
用户自定义函数(UDF)可以在转换规则中使用。
Review Comment:
这里删除了“只有第一个匹配的转换规则将应用”的说明,但实现仍然是 first-match
语义(同一张表只会应用第一个匹配规则)。建议在文档中保留或在其他位置补充该行为说明,否则用户在为同一表配置多条规则时容易产生误解。
##########
docs/content/docs/core-concept/transform.md:
##########
@@ -340,23 +340,6 @@ transform:
```
Tips: The format of table-options is `key1=value1,key2=value2`.
Review Comment:
This section removed the documentation that transform rules are evaluated
with a first-match policy (only the first matching rule applies). The
implementation still uses first-match semantics (e.g., iterating transformers
and returning the first selector match), so the docs should keep describing
this behavior or relocate it elsewhere to avoid misleading users configuring
multiple rules for the same table.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]