fresh-borzoni commented on code in PR #28090:
URL: https://github.com/apache/flink/pull/28090#discussion_r3373769865
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala:
##########
@@ -659,26 +662,31 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
case join: StreamPhysicalJoin =>
val onlyAfterByParent = requiredUpdateTrait.updateKind ==
UpdateKind.ONLY_UPDATE_AFTER
- val children = join.getInputs.zipWithIndex.map {
- case (child, childOrdinal) =>
- val physicalChild = child.asInstanceOf[StreamPhysicalRel]
- val supportOnlyAfter =
join.inputUniqueKeyContainsJoinKey(childOrdinal)
- val inputModifyKindSet = getModifyKindSet(physicalChild)
- if (onlyAfterByParent) {
- if (inputModifyKindSet.contains(ModifyKind.UPDATE) &&
!supportOnlyAfter) {
- // the parent requires only-after, however, the join doesn't
support this
- None
- } else {
- this.visit(physicalChild,
onlyAfterOrNone(inputModifyKindSet))
- }
- } else {
- this.visit(physicalChild,
beforeAfterOrNone(inputModifyKindSet))
- }
- }
- if (children.exists(_.isEmpty)) {
+ if (onlyAfterByParent && hasNonUpsertKeyNonEquiCondition(join)) {
Review Comment:
I don't think we can - it gates delta-join conversion in
DeltaJoinRewriteRule, which is a separate path that changelog-mode inference
doesn't touch.
If we dropped it, a non-upsert-key condition could turn into a delta join
with nothing keeping UB. I did remove some duplication though
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]