fresh-borzoni commented on code in PR #28090:
URL: https://github.com/apache/flink/pull/28090#discussion_r3373769865


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala:
##########
@@ -659,26 +662,31 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
 
         case join: StreamPhysicalJoin =>
           val onlyAfterByParent = requiredUpdateTrait.updateKind == 
UpdateKind.ONLY_UPDATE_AFTER
-          val children = join.getInputs.zipWithIndex.map {
-            case (child, childOrdinal) =>
-              val physicalChild = child.asInstanceOf[StreamPhysicalRel]
-              val supportOnlyAfter = 
join.inputUniqueKeyContainsJoinKey(childOrdinal)
-              val inputModifyKindSet = getModifyKindSet(physicalChild)
-              if (onlyAfterByParent) {
-                if (inputModifyKindSet.contains(ModifyKind.UPDATE) && 
!supportOnlyAfter) {
-                  // the parent requires only-after, however, the join doesn't 
support this
-                  None
-                } else {
-                  this.visit(physicalChild, 
onlyAfterOrNone(inputModifyKindSet))
-                }
-              } else {
-                this.visit(physicalChild, 
beforeAfterOrNone(inputModifyKindSet))
-              }
-          }
-          if (children.exists(_.isEmpty)) {
+          if (onlyAfterByParent && hasNonUpsertKeyNonEquiCondition(join)) {

Review Comment:
    I don't think we can - it gates delta-join conversion in 
DeltaJoinRewriteRule, which is a separate path that changelog-mode inference 
doesn't touch. 
    If we dropped it, a non-upsert-key condition could turn into a delta join 
with nothing keeping UB. I did remove some duplication though



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to