featzhang commented on code in PR #28090:
URL: https://github.com/apache/flink/pull/28090#discussion_r3235886217


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala:
##########
@@ -1599,6 +1631,90 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
     }
   }
 
+  private def referencesNonUpsertKeyColumns(node: RelNode, rexNodes: 
Seq[RexNode]): Boolean = {
+    if (rexNodes.isEmpty) {
+      return false
+    }
+
+    val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(node.getCluster.getMetadataQuery)
+    val upsertKeys = fmq.getUpsertKeys(node)
+
+    if (upsertKeys == null || upsertKeys.isEmpty) {
+      return true
+    }
+
+    val fieldRefIndices = ImmutableBitSet.of(
+      
RexNodeExtractor.extractRefInputFields(JavaScalaConversionUtil.toJava(rexNodes)):
 _*)
+
+    !upsertKeys.exists(upsertKey => upsertKey.contains(fieldRefIndices))
+  }
+
+  private def hasNonUpsertKeyFilterPushedDown(ts: 
StreamPhysicalTableSourceScan): Boolean = {
+    val tableSourceTable = ts.getTable.unwrap(classOf[TableSourceTable])
+    if (tableSourceTable == null) {
+      return false
+    }
+
+    val filterSpec = tableSourceTable.abilitySpecs
+      .collectFirst { case spec: FilterPushDownSpec => spec }
+
+    filterSpec match {
+      case Some(spec) =>
+        val predicates = JavaScalaConversionUtil.toScala(spec.getPredicates)
+        referencesNonUpsertKeyColumns(ts, predicates)
+      case None => false

Review Comment:
   Index-space mismatch worth checking: `FilterPushDownSpec.getPredicates()` 
keeps `RexNode`s indexed against the **original source row type** (see 
`FilterPushDownSpec#resolvePredicates` using `context.getSourceRowType()`). 
`fmq.getUpsertKeys(ts)` on the next line is computed against the **current** 
scan row type — which is the projected/reordered schema once 
`ProjectPushDownSpec` has fired. When both abilities are present, 
`upsertKey.contains(fieldRefIndices)` compares indices from two different 
schemas, so a filter on an upsert-key column can be misclassified as non-upsert 
and the UB drop is needlessly suppressed (correctness is safe, but the 
optimization silently regresses). A remap through the project spec, or at least 
an `IT` covering `filter + project pushdown` together, would lock this down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to