xuyangzhong commented on code in PR #27111:
URL: https://github.com/apache/flink/pull/27111#discussion_r2454406713


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java:
##########
@@ -240,7 +248,28 @@ private static List<List<String>> 
getAllIndexesColumnsOfTable(
     private static boolean areJoinConditionsSupported(StreamPhysicalJoin join) 
{
         JoinInfo joinInfo = join.analyzeCondition();
         // there must be one pair of join key
-        return !joinInfo.pairs().isEmpty();
+        if (joinInfo.pairs().isEmpty()) {
+            return false;
+        }
+
+        // if this join output cdc records, the non-equiv condition must be 
applied on upsert key

Review Comment:
   You raised a very important issue. You can refer 
[FlinkChangelogModeInferenceProgram.scala](https://github.com/apache/flink/blob/64ce0c283633fed3715558d59d9de9e411289718/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala#L720)
 to understand the problem better. The essence of the issue is whether we can 
allow the upstream to omit sending Update Before(UB) when a filter is present. 
We can break down the filtering condition into two scenarios: 1. Filtering only 
on upsert key columns, and 2. Filtering on non-upsert keys. The distinction 
between these two cases is that in scenario 1, we can enable further 
optimization for the upstream and avoid sending UB.
   
   Currently, in the FlinkChangelogModeInferenceProgram, we only check the 
filter on the Calc node. However, there are similar filters present on the join 
node (which are divided into join keys and non-equivalent join conditions), and 
this aspect has been overlooked. To prevent expanding the scope of this pull 
request, I only ensure that a delta join will not be optimized here (since 
Delta Join cannot consume UB messages) without altering the existing logic for 
streaming join.
   
   As for why we don't need to address the join key, that has already been 
handled here: 
[FlinkChangelogModeInferenceProgram.scala](https://github.com/apache/flink/blob/64ce0c283633fed3715558d59d9de9e411289718/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala#L662).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to