xuyangzhong commented on code in PR #28090:
URL: https://github.com/apache/flink/pull/28090#discussion_r3186747532
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala:
##########
@@ -1599,6 +1631,90 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
}
}
+ private def referencesNonUpsertKeyColumns(node: RelNode, rexNodes:
Seq[RexNode]): Boolean = {
Review Comment:
Maybe we would better extract a common method from
`referencesNonUpsertKeyColumns`, `referencesNonUpsert` and
`isNonUpsertKeyCondition`, WDYT?
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala:
##########
@@ -1303,27 +1319,35 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
createNewNode(process, Some(children), providedDeleteTrait)
case join: StreamPhysicalJoin =>
Review Comment:
Nit: should we also handle `StreamPhysicalChangelogNormalize`,
`StreamPhysicalTemporalJoin`, `StreamPhysicalLookupJoin`,
`StreamPhysicalCorrelateBase`, `StreamPhysicalIntervalJoin`,
`StreamPhysicalWindowJoin` and `StreamPhysicalMultiJoin`? They contain filters
too.
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/ChangelogSourceITCase.scala:
##########
@@ -441,6 +441,189 @@ class ChangelogSourceITCase(
}
}
+ @TestTemplate
+ def testFilterPushedDownOnNonUpsertKey(): Unit = {
+ // FLINK-38579: Filter pushed down to source on non-upsert key should
require UPDATE_BEFORE
+
+ val testDataId = TestValuesTableFactory.registerData(
+ Seq(
+ changelogRow("+I", Int.box(1), "tom", Int.box(1)),
+ changelogRow("-U", Int.box(1), "tom", Int.box(1)),
+ changelogRow("+U", Int.box(1), "tom", Int.box(2))
+ ))
+ tEnv.executeSql(s"""
+ |CREATE TABLE t (
+ | a int primary key not enforced,
+ | b varchar,
+ | c int
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '$testDataId',
+ | 'changelog-mode' = 'I,UA,UB,D',
+ | 'filterable-fields' = 'c'
+ |)
+ |""".stripMargin)
+
+ tEnv.executeSql(s"""
+ |CREATE TABLE s (
+ | a int primary key not enforced,
+ | b varchar,
+ | c int
+ |) WITH (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'false',
+ | 'sink-changelog-mode-enforced' = 'I,UA,D'
+ |)
+ |""".stripMargin)
+
+ // CDC duplicate + MiniBatch is incompatible: ChangelogNormalize (needed
for CDC deduplication)
+ // requires ONLY_UPDATE_AFTER at the source level, but filter on
non-upsert key requires UPDATE_BEFORE
+ if (sourceMode == CHANGELOG_SOURCE_WITH_EVENTS_DUPLICATE && miniBatch ==
MiniBatchOn) {
+ assertThatThrownBy(() => tEnv.executeSql("insert into s select * from t
where c < 2"))
+ .isInstanceOf(classOf[org.apache.flink.table.api.TableException])
+ .hasMessageContaining("Can't generate a valid execution plan")
+ return
+ }
+
+ tEnv.executeSql("insert into s select * from t where c < 2").await()
+
+ // The result should be empty because:
+ // - Filter c < 2 matches only c=1
+ // - The record (1,tom,1) was inserted (+I) then deleted (-U)
+ // - With the fix, UPDATE_BEFORE is preserved and correctly deletes the
record
+ // - Without the fix, UPDATE_BEFORE would be dropped, leaving (1,tom,1) in
the result
+ val expected = List[String]()
+
assertThat(TestValuesTableFactory.getResultsAsStrings("s").sorted).isEqualTo(expected.sorted)
+ }
+
+ @TestTemplate
+ def testJoinWithNonEquivConditionOnNonUpsertKey(): Unit = {
Review Comment:
nit extract some same logic between
`testJoinWithNonEquivConditionOnNonUpsertKey` and
`testJoinWithNonEquivConditionOnRightNonUpsertKey`
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/ChangelogSourceITCase.scala:
##########
@@ -441,6 +441,189 @@ class ChangelogSourceITCase(
}
}
+ @TestTemplate
+ def testFilterPushedDownOnNonUpsertKey(): Unit = {
+ // FLINK-38579: Filter pushed down to source on non-upsert key should
require UPDATE_BEFORE
+
+ val testDataId = TestValuesTableFactory.registerData(
+ Seq(
+ changelogRow("+I", Int.box(1), "tom", Int.box(1)),
+ changelogRow("-U", Int.box(1), "tom", Int.box(1)),
+ changelogRow("+U", Int.box(1), "tom", Int.box(2))
+ ))
+ tEnv.executeSql(s"""
+ |CREATE TABLE t (
+ | a int primary key not enforced,
+ | b varchar,
+ | c int
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '$testDataId',
+ | 'changelog-mode' = 'I,UA,UB,D',
+ | 'filterable-fields' = 'c'
+ |)
+ |""".stripMargin)
+
+ tEnv.executeSql(s"""
+ |CREATE TABLE s (
+ | a int primary key not enforced,
+ | b varchar,
+ | c int
+ |) WITH (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'false',
+ | 'sink-changelog-mode-enforced' = 'I,UA,D'
+ |)
+ |""".stripMargin)
+
+ // CDC duplicate + MiniBatch is incompatible: ChangelogNormalize (needed
for CDC deduplication)
+ // requires ONLY_UPDATE_AFTER at the source level, but filter on
non-upsert key requires UPDATE_BEFORE
+ if (sourceMode == CHANGELOG_SOURCE_WITH_EVENTS_DUPLICATE && miniBatch ==
MiniBatchOn) {
+ assertThatThrownBy(() => tEnv.executeSql("insert into s select * from t
where c < 2"))
+ .isInstanceOf(classOf[org.apache.flink.table.api.TableException])
+ .hasMessageContaining("Can't generate a valid execution plan")
Review Comment:
Perhaps we should fix this issue introduced by this PR together, or at least
ensure it's resolved in the same release.
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/ChangelogSourceITCase.scala:
##########
@@ -441,6 +441,189 @@ class ChangelogSourceITCase(
}
}
+ @TestTemplate
+ def testFilterPushedDownOnNonUpsertKey(): Unit = {
+ // FLINK-38579: Filter pushed down to source on non-upsert key should
require UPDATE_BEFORE
+
+ val testDataId = TestValuesTableFactory.registerData(
+ Seq(
+ changelogRow("+I", Int.box(1), "tom", Int.box(1)),
+ changelogRow("-U", Int.box(1), "tom", Int.box(1)),
+ changelogRow("+U", Int.box(1), "tom", Int.box(2))
+ ))
+ tEnv.executeSql(s"""
+ |CREATE TABLE t (
+ | a int primary key not enforced,
+ | b varchar,
+ | c int
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '$testDataId',
+ | 'changelog-mode' = 'I,UA,UB,D',
+ | 'filterable-fields' = 'c'
+ |)
+ |""".stripMargin)
+
+ tEnv.executeSql(s"""
+ |CREATE TABLE s (
+ | a int primary key not enforced,
+ | b varchar,
+ | c int
+ |) WITH (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'false',
+ | 'sink-changelog-mode-enforced' = 'I,UA,D'
+ |)
+ |""".stripMargin)
+
+ // CDC duplicate + MiniBatch is incompatible: ChangelogNormalize (needed
for CDC deduplication)
+ // requires ONLY_UPDATE_AFTER at the source level, but filter on
non-upsert key requires UPDATE_BEFORE
+ if (sourceMode == CHANGELOG_SOURCE_WITH_EVENTS_DUPLICATE && miniBatch ==
MiniBatchOn) {
+ assertThatThrownBy(() => tEnv.executeSql("insert into s select * from t
where c < 2"))
+ .isInstanceOf(classOf[org.apache.flink.table.api.TableException])
+ .hasMessageContaining("Can't generate a valid execution plan")
+ return
+ }
+
+ tEnv.executeSql("insert into s select * from t where c < 2").await()
+
+ // The result should be empty because:
+ // - Filter c < 2 matches only c=1
+ // - The record (1,tom,1) was inserted (+I) then deleted (-U)
+ // - With the fix, UPDATE_BEFORE is preserved and correctly deletes the
record
+ // - Without the fix, UPDATE_BEFORE would be dropped, leaving (1,tom,1) in
the result
+ val expected = List[String]()
+
assertThat(TestValuesTableFactory.getResultsAsStrings("s").sorted).isEqualTo(expected.sorted)
+ }
+
+ @TestTemplate
+ def testJoinWithNonEquivConditionOnNonUpsertKey(): Unit = {
Review Comment:
nit `testJoinWithNonEquivConditionOnLeftNonUpsertKey`
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala:
##########
@@ -659,26 +662,31 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
case join: StreamPhysicalJoin =>
val onlyAfterByParent = requiredUpdateTrait.updateKind ==
UpdateKind.ONLY_UPDATE_AFTER
- val children = join.getInputs.zipWithIndex.map {
- case (child, childOrdinal) =>
- val physicalChild = child.asInstanceOf[StreamPhysicalRel]
- val supportOnlyAfter =
join.inputUniqueKeyContainsJoinKey(childOrdinal)
- val inputModifyKindSet = getModifyKindSet(physicalChild)
- if (onlyAfterByParent) {
- if (inputModifyKindSet.contains(ModifyKind.UPDATE) &&
!supportOnlyAfter) {
- // the parent requires only-after, however, the join doesn't
support this
- None
- } else {
- this.visit(physicalChild,
onlyAfterOrNone(inputModifyKindSet))
- }
- } else {
- this.visit(physicalChild,
beforeAfterOrNone(inputModifyKindSet))
- }
- }
- if (children.exists(_.isEmpty)) {
+ if (onlyAfterByParent && hasNonUpsertKeyNonEquiCondition(join)) {
Review Comment:
After this fix, we can safely delete the logic in
`DeltaJoinUtil#isFilterOnOneSetOfUpsertKeys`, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]