[ 
https://issues.apache.org/jira/browse/SPARK-38205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17492895#comment-17492895
 ] 

Jungtaek Lim commented on SPARK-38205:
--------------------------------------

I realized the output schema should also be nullable (since the operator will 
produce output from state), and now puzzled whether there may be cases I’m 
going to break the existing query (DSv2 sink may check the nullability when 
writing).

I guess another way is never changing the nullability on optimizer and keep the 
nullability check in state. I would rely on less invasive way if there is one, 
since the lifetime of streaming query is long, across Spark versions, and 
compatibility is a major concern.

> The columns in state schema should be relaxed to be nullable
> ------------------------------------------------------------
>
>                 Key: SPARK-38205
>                 URL: https://issues.apache.org/jira/browse/SPARK-38205
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.1.2, 3.2.1, 3.3.0
>            Reporter: Jungtaek Lim
>            Priority: Major
>
> Starting from SPARK-27237, Spark validates the schema of state across query 
> runs to make sure it doesn't fall into more weird issue like SIGSEGV on the 
> runtime.
> The comparison logic is reasonable in terms of nullability; it has below 
> matrices:
> ||existing schema||new schema||allowed||
> |nullable|nullable|O|
> |nullable|non-nullable|O|
> |non-nullable|nullable|X|
> |non-nullable|non-nullable|O|
> What we miss here is, the nullability of the column can be changed in the 
> optimizer (mostly nullable to non-nullable), and the optimization about 
> nullability could be applied differently with any simple changes.
> So this scenario is hypothetically possible:
> 1. At the first run of the query, optimizer marks some columns from nullable 
> to non-nullable, and it goes to the schema of the state. (state schema has a 
> column with non-nullable)
> 2. At the second run of the query (possibly with code modification or 
> upgrading Spark version), optimizer no longer marks such columns from 
> nullable to non-nullable, and it goes with comparison of the schema of the 
> state (existing vs new), comparing non-nullable (existing) vs nullable (new), 
> which is NOT allowed.
> In terms of storage view for state store, it is not required to determine the 
> column as non-nullable vs nullable. Interface-wise, state store has no 
> concept of schema; so it is safe to relax such constraint, and open the 
> chance for optimizer to do whatever it wants and doesn't break stateful 
> operators.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to