cloud-fan commented on code in PR #56061:
URL: https://github.com/apache/spark/pull/56061#discussion_r3301377768


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WidenStatefulOperatorAttributeNullability.scala:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Shared helpers for the stateful-operator nullability fix. The fix has three
+ * independent components, all gated by
+ * [[SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT]] (pinned per-query via 
the
+ * offset log so existing queries keep their pre-fix behavior on restart):
+ *
+ *   - (a) `widenStateSchema`: explicit `asNullable` at every state-schema 
construction
+ *         site in each stateful physical exec.
+ *   - (b) `widenOutputForStatefulOp`: a per-op `output` override on every 
stateful logical
+ *         and physical operator, used by the operator's `output` definition.
+ *   - (c) [[WidenStatefulOperatorAttributeNullability]] (defined below in 
this file): a
+ *         custom optimizer rule that widens `AttributeReference`s inside 
stateful ops'
+ *         internal expressions and propagates upward to ancestor expressions.
+ */
+object WidenStatefulOpNullability {
+
+  def isEnabled: Boolean =
+    SQLConf.get.getConf(SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT)
+
+  /**
+   * Recursively widens an attribute to be fully nullable: outer `nullable = 
true` plus
+   * every nested `StructField.nullable`, `ArrayType.containsNull`, and
+   * `MapType.valueContainsNull` flipped to `true` via
+   * [[org.apache.spark.sql.types.DataType#asNullable]].
+   */
+  def deepWidenAttribute(a: Attribute): Attribute = a match {
+    case ref: AttributeReference =>
+      AttributeReference(
+        ref.name, ref.dataType.asNullable, nullable = true, ref.metadata)(
+        ref.exprId, ref.qualifier)
+    case other => other.withNullability(true)
+  }
+
+  /**
+   * Component (a): widens a state schema to fully nullable. Stateful physical 
execs apply
+   * this at every `validateAndMaybeEvolveStateSchema(...)` call site and every
+   * `mapPartitionsWith*StateStore(...)` call site. When the conf is off, 
returns the
+   * schema unchanged.
+   */
+  def widenStateSchema(schema: StructType): StructType =
+    if (isEnabled) schema.asNullable else schema
+
+  /**
+   * Component (b): wraps a stateful operator's `output` to be fully nullable. 
The caller
+   * is responsible for only calling this from within an `output` definition 
on a stateful
+   * operator; gating is handled here via [[isEnabled]].
+   */
+  def widenOutputForStatefulOp(base: Seq[Attribute]): Seq[Attribute] =
+    if (isEnabled) base.map(deepWidenAttribute) else base
+}
+
+/**
+ * Component (c) of the stateful-operator nullability fix: a custom optimizer 
rule that
+ * widens `AttributeReference`s inside streaming-stateful operators' internal 
expressions
+ * and propagates the widening upward to ancestor operators' expressions.
+ *
+ * The rule does NOT introduce any new logical or physical node. It is purely 
an
+ * attribute-rewrite pass: for every node whose subtree contains a stateful 
operator,
+ * collect `exprId`s from `p.output` plus children's output, then deep-widen 
every
+ * `AttributeReference` in the node's expressions whose `exprId` is in that 
set via
+ * [[WidenStatefulOpNullability#deepWidenAttribute]].
+ *
+ * At a stateful operator itself, all children's output attributes are 
included because
+ * the operator's internal expressions (e.g. grouping keys) reference them 
directly.
+ * At non-stateful ancestor operators, only children whose subtrees contain a 
stateful
+ * operator are included, to avoid unnecessary widening of non-stateful 
siblings.
+ *
+ * '''Scope.''' The walk only fires on nodes whose subtree contains a stateful 
operator.
+ *
+ * '''Ordering constraint.''' This rule must run AFTER every 
`UpdateAttributeNullability`
+ * invocation in both the main optimizer and AQE.
+ *
+ * '''Idempotence.''' [[WidenStatefulOpNullability#deepWidenAttribute]] is 
idempotent.
+ */
+object WidenStatefulOperatorAttributeNullability extends Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    if (!conf.getConf(SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT) ||
+        !plan.containsStatefulOperator) {
+      return plan
+    }
+    plan.resolveOperatorsUp {
+      case p if !p.resolved => p
+      case p: LeafNode => p
+      case p if !p.containsStatefulOperator => p
+      case p =>
+        val childOutputs = if (p.isStateful) {
+          p.children.flatMap(_.output)
+        } else {
+          p.children.filter(_.containsStatefulOperator).flatMap(_.output)
+        }
+        val widenableExprIds: Set[ExprId] =
+          (p.output ++ childOutputs)

Review Comment:
   Follow-up to 3289376367. Filtering `childOutputs` to stateful subtrees only 
tightens half of the union — `p.output` is still pulled in unconditionally. For 
an Inner / Outer / Full `Join`, `Join.output = left.output ++ right.output` 
carries the non-stateful side's `exprId`s, so for a non-stream-stream `Join` 
above `[stateful, batch]`, references to the batch side in the join condition 
still end up in `widenableExprIds` and still get widened. (For `LeftSemi` / 
`LeftAnti` where `Join.output = left.output`, the filter does fully help.)
   
   Not a blocker for this PR, but it means the comment at lines 90-91 ("only 
children whose subtrees contain a stateful operator are included, to avoid 
unnecessary widening of non-stateful siblings") slightly overstates the scope 
for mixed-stateful `Join`s. Two ways to reconcile in a follow-up:
   - Tighten the code: compute `statefulExprIds` from stateful children's 
outputs and use `(p.output.filter(ar => statefulExprIds.contains(ar.exprId)) ++ 
statefulExprIds-attrs)` so the Inner/Outer-join case is also handled.
   - Or keep the partial fix and weaken the comment to call out the 
mixed-stateful `Join` caveat.
   
   Happy with either path.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -3440,6 +3440,23 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT =
+    
buildConf("spark.sql.streaming.statefulOperator.alwaysNullableOutput.enabled")
+      .internal()
+      .doc("When true, every streaming stateful operator reports its output 
schema with " +
+        "nullable=true on all columns (including nested struct fields, array 
elements, and " +
+        "map values), the state schema is widened at every construction site, 
and the state " +
+        "schema is widened at every construction site, so the existing state 
schema " +

Review Comment:
   Apologies — late catch from the prior re-review, the suggestion accept in 
`f69c587` left two copies of the same clause. Line 3448 ends `... the state 
schema is widened at every construction site, and the state ` and line 3449 
starts `schema is widened at every construction site, so ...`. One should go — 
likely the intent was to combine the original "state schema is widened" + the 
new "compatibility check trivially passes" parts into a single sentence.
   
   ```suggestion
           "map values), and the state schema is widened at every construction 
site, so the " +
           "existing state schema compatibility check trivially passes 
regardless of input " +
           "nullability. " +
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to