featzhang commented on code in PR #28090:
URL: https://github.com/apache/flink/pull/28090#discussion_r3235886217
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala:
##########
@@ -1599,6 +1631,90 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
}
}
+ private def referencesNonUpsertKeyColumns(node: RelNode, rexNodes:
Seq[RexNode]): Boolean = {
+ if (rexNodes.isEmpty) {
+ return false
+ }
+
+ val fmq =
FlinkRelMetadataQuery.reuseOrCreate(node.getCluster.getMetadataQuery)
+ val upsertKeys = fmq.getUpsertKeys(node)
+
+ if (upsertKeys == null || upsertKeys.isEmpty) {
+ return true
+ }
+
+ val fieldRefIndices = ImmutableBitSet.of(
+
RexNodeExtractor.extractRefInputFields(JavaScalaConversionUtil.toJava(rexNodes)):
_*)
+
+ !upsertKeys.exists(upsertKey => upsertKey.contains(fieldRefIndices))
+ }
+
+ private def hasNonUpsertKeyFilterPushedDown(ts:
StreamPhysicalTableSourceScan): Boolean = {
+ val tableSourceTable = ts.getTable.unwrap(classOf[TableSourceTable])
+ if (tableSourceTable == null) {
+ return false
+ }
+
+ val filterSpec = tableSourceTable.abilitySpecs
+ .collectFirst { case spec: FilterPushDownSpec => spec }
+
+ filterSpec match {
+ case Some(spec) =>
+ val predicates = JavaScalaConversionUtil.toScala(spec.getPredicates)
+ referencesNonUpsertKeyColumns(ts, predicates)
+ case None => false
Review Comment:
Index-space mismatch worth checking: `FilterPushDownSpec.getPredicates()`
keeps `RexNode`s indexed against the **original source row type** (see
`FilterPushDownSpec#resolvePredicates` using `context.getSourceRowType()`).
`fmq.getUpsertKeys(ts)` on the next line is computed against the **current**
scan row type — which is the projected/reordered schema once
`ProjectPushDownSpec` has fired. When both abilities are present,
`upsertKey.contains(fieldRefIndices)` compares indices from two different
schemas, so a filter on an upsert-key column can be misclassified as non-upsert
and the UB drop is needlessly suppressed (correctness is safe, but the
optimization silently regresses). A remap through the project spec, or at least
an `IT` covering `filter + project pushdown` together, would lock this down.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]