HeartSaVioR opened a new pull request, #56061:
URL: https://github.com/apache/spark/pull/56061

   ### What changes were proposed in this pull request?
   
   Introduce a three-component fix for stateful-operator nullability drift, 
gated by `spark.sql.streaming.statefulOperator.alwaysNullableOutput.enabled` 
(pinned per-query via the offset log):
   
   - (a) `WidenStatefulOpNullability.widenStateSchema`: every stateful physical 
exec widens its state key/value schema to fully nullable at construction.
   - (b) `WidenStatefulOpNullability.widenOutputForStatefulOp`: every stateful 
logical and physical operator widens its declared `output` to fully nullable.
   - (c) `WidenStatefulOperatorAttributeNullability`: an optimizer rule that 
widens `AttributeReference`s inside stateful ops' internal expressions and 
propagates upward through ancestor expressions.
   
   With the above fix, we aim to ensure the state schema to be "fully" nullable 
(top level column, nested column, and collection types) regardless of the input 
schema, and the output schema of the stateful operator to be also "fully" 
nullable as well. The change of output schema for stateful operator is 
necessary, because even if the input schema is non-nullable, state can produce 
the null value, hence the output can be nullable.
   
   ### Why are the changes needed?
   
   This has been a long standing issue of streaming engine vs Query Optimizer.
   
   By the nature of streaming query, the query is meant to be long-running, in 
many cases spans to multiple Spark versions. Also, the logical plan is not 
always the same across batches (e.g. there are multiple stream sources and one 
of the source does not have a new data at batch N). This puts the streaming 
query to be affected by analyzer and optimizer.
   
   The state schema of stateful operator is mostly determined by the input 
schema of the stateful operator, and nullability isn't an exception. If the 
input schema has a nullable column, state schema would have a nullable column. 
Vice versa with non-nullable column.
   
   For Query Optimizer, one of the optimizations is to flip the nullability, 
say, nullable to non-nullable if appropriate. This can be done directly or 
indirectly, and the most problematic case is when the optimization is applied 
"selectively".
   
   The one of easy example is the elimination of Union: for the streaming query 
with multiple streams using Union, batch N could have one stream be non-empty 
while another stream to be empty. For that case,`PropagateEmptyRelation` can 
drop empty `Union` branches, causing a per-column nullability flip that 
propagates into a stateful operator's state schema across microbatches or 
restarts. This causes either `STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE` on restart 
or a codegen NPE when state-restored rows carry nulls in columns declared 
non-nullable.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No user-visible behavior change for new queries (all stateful operator 
outputs become nullable, which is semantically correct). Existing queries keep 
their original behavior via the offset log gate.
   
   ### How was this patch tested?
   
   New `StreamingStatefulOperatorNullabilityDriftSuite` covering:
   - New-query path: Union-branch-drop restart scenarios for aggregate,
     dropDuplicates, dropDuplicatesWithinWatermark.
   - Codegen NPE regression with struct grouping keys.
   - Existing-query path: widening forced off still triggers schema mismatch.
   - Rule-level: scope check (non-stateful subtrees skipped).
   - Helper-level: `deepWidenAttribute` recursion into nested types.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Yes. Generated-by: Claude 4.7 Opus


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to