xuyangzhong commented on code in PR #27111:
URL: https://github.com/apache/flink/pull/27111#discussion_r2454406713
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java:
##########
@@ -240,7 +248,28 @@ private static List<List<String>>
getAllIndexesColumnsOfTable(
private static boolean areJoinConditionsSupported(StreamPhysicalJoin join)
{
JoinInfo joinInfo = join.analyzeCondition();
// there must be one pair of join key
- return !joinInfo.pairs().isEmpty();
+ if (joinInfo.pairs().isEmpty()) {
+ return false;
+ }
+
+ // if this join output cdc records, the non-equiv condition must be
applied on upsert key
Review Comment:
You raised a very important issue. You can refer
[FlinkChangelogModeInferenceProgram.scala](https://github.com/apache/flink/blob/64ce0c283633fed3715558d59d9de9e411289718/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala#L720)
to understand the problem better. The essence of the issue is whether we can
allow the upstream to omit sending Update Before(UB) when a filter is present.
We can break down the filtering condition into two scenarios: 1. Filtering only
on upsert key columns, and 2. Filtering on non-upsert keys. The distinction
between these two cases is that in scenario 1, we can enable further
optimization for the upstream and avoid sending UB.
Currently, in the FlinkChangelogModeInferenceProgram, we only check the
filter on the Calc node. However, there are similar filters present on the join
node (which are divided into join keys and non-equivalent join conditions), and
this aspect has been overlooked. To prevent expanding the scope of this pull
request, I only ensure that a delta join will not be optimized here (since
Delta Join cannot consume UB messages) without altering the existing logic for
streaming join.
As for why we don't need to address the join key, that has already been
handled here:
[FlinkChangelogModeInferenceProgram.scala](https://github.com/apache/flink/blob/64ce0c283633fed3715558d59d9de9e411289718/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala#L662).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]