yuxiqian commented on code in PR #4319:
URL: https://github.com/apache/flink-cdc/pull/4319#discussion_r2943971227
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java:
##########
@@ -289,40 +289,71 @@ private Optional<Event> processDataChangeEvent(
getProjectionProcessor(tableId, effectiveTransformer);
TransformFilterProcessor filterProcessor =
getFilterProcessor(tableId, effectiveTransformer);
- RecordData beforeRow = null;
- RecordData afterRow = null;
- boolean filterPassed = true;
+
+ BinaryRecordData beforeRow = null;
+ BinaryRecordData afterRow = null;
+ boolean beforeFilterPassed = false;
+ boolean afterFilterPassed = false;
+
if (event.before() != null) {
context.opType = beforeOp;
Tuple2<BinaryRecordData, Boolean> result =
transformRecord(
event.before(), info, projectionProcessor,
filterProcessor, context);
beforeRow = result.f0;
- filterPassed = result.f1;
+ beforeFilterPassed = result.f1;
}
if (event.after() != null) {
context.opType = afterOp;
Tuple2<BinaryRecordData, Boolean> result =
transformRecord(
event.after(), info, projectionProcessor,
filterProcessor, context);
afterRow = result.f0;
- filterPassed = result.f1;
+ afterFilterPassed = result.f1;
}
- if (filterPassed) {
- DataChangeEvent finalEvent = DataChangeEvent.projectRecords(event,
beforeRow, afterRow);
- if (effectiveTransformer.getPostTransformConverter().isPresent()) {
- return effectiveTransformer
- .getPostTransformConverter()
- .get()
- .convert(finalEvent)
- .map(Event.class::cast);
- } else {
- return Optional.of(finalEvent);
- }
+ // For UPDATE events, before and after filter results may differ,
requiring op type
+ // conversion:
+ // before=Y, after=Y -> UPDATE; before=Y, after=N -> DELETE;
+ // before=N, after=Y -> INSERT; before=N, after=N -> drop.
+ DataChangeEvent finalEvent;
+ switch (event.op()) {
+ case INSERT:
+ case REPLACE:
+ if (!afterFilterPassed) {
+ return Optional.empty();
+ }
+ finalEvent = DataChangeEvent.projectRecords(event, beforeRow,
afterRow);
+ break;
+ case DELETE:
+ if (!beforeFilterPassed) {
+ return Optional.empty();
+ }
+ finalEvent = DataChangeEvent.projectRecords(event, beforeRow,
afterRow);
+ break;
+ case UPDATE:
+ if (beforeFilterPassed && afterFilterPassed) {
+ finalEvent = DataChangeEvent.projectRecords(event,
beforeRow, afterRow);
+ } else if (beforeFilterPassed) {
+ finalEvent = DataChangeEvent.deleteEvent(tableId,
beforeRow, event.meta());
+ } else if (afterFilterPassed) {
Review Comment:
I prefer not to re-evaluate expressions as the filter condition itself may
depend on the `opType` itself, and that might cause inconsistencies.
Also, `op_type` should represent the original type from source, and keeping
it intact should be acceptable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]