[ 
https://issues.apache.org/jira/browse/SPARK-31376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17077819#comment-17077819
 ] 

Jungtaek Lim commented on SPARK-31376:
--------------------------------------

I'm saying that sort is simply unavailable operation for streaming workload, in 
theory. Please don't put aside the concept of streaming - input is "unbounded".

It doesn't make any difference from partition vs global, as even for a single 
partition it doesn't make sense to sort given the input is unbounded. If we 
allow sort within the micro-batch, the result will be quite different even for 
the same streaming query how batches are constructed, which is not what 
"Structured Streaming" proposes I think. (That's why the concept of "state" has 
to be introduced.) If we simply support "operation within micro-batch" then 
global sort wouldn't matter as well.

The meaning of "same result" is the "output rows" Spark writes to the sink. It 
doesn't mean to be any physical kind of guarantees, like order, partitions, 
etc. From the viewpoint the output would be same between batch query and 
streaming query, and the operation is defined as "not supported" in structured 
streaming if it's impossible to do it in streaming query.

You'll get no chance to even try to sort if you play with record-to-record 
streaming framework. It's only possible with "micro-batch" manner, as 
"micro-batch" does have "batch" characteristic. You can leverage the benefits 
via using foreachBatch - it's better than supporting this natively as you would 
recognize that your operations are no longer tied to the streaming semantic 
when you leverage foreachBatch. Supporting this natively leads confusion on 
reasoning about the behavior.

> Also, I don't quite understand how repartitioning is valid on a streaming 
> query, I was surprised when that worked. That seems more questionable than 
> local sorting.

I'm not sure what's the reason you reach the conclusion. It's pretty natural 
that repartitioning is valid on a streaming query - shuffle has been provided 
from the early generation of streaming frameworks. There're only the 
differences on how to handle shuffling - push based vs pull based. If you play 
with record-to-record streaming framework then it's likely push based (as each 
output row should be immediately sent to the downstream), whereas with batch 
oriented streaming framework it's likely pull based (can be push based though).

One more thing, the behavior of the operation must be consistent across data 
source - if something makes sense for file data source but not making sense for 
kafka data source, the operation cannot be supported natively.

> Non-global sort support for structured streaming
> ------------------------------------------------
>
>                 Key: SPARK-31376
>                 URL: https://issues.apache.org/jira/browse/SPARK-31376
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 3.1.0
>            Reporter: Adam Binford
>            Priority: Minor
>
> Currently, all sorting is disallowed with structured streaming queries. Not 
> allowing global sorting makes sense, but could non-global sorting (i.e. 
> sortWithinPartitions) be allowed? I'm running into this with an external 
> source I'm using, but not sure if this would be useful to file sources as 
> well. I have to foreachBatch so that I can do a sortWithinPartitions.
> Two main questions:
>  * Does a local sort cause issues with any exactly-once guarantees streaming 
> queries provides? I can't say I know or understand how these semantics work. 
> Or are there other issues I can't think of this would cause?
>  * Is the change as simple as changing the unsupported operations check to 
> only look for global sorts instead of all sorts?
> I have built a version that simply changes the unsupported check to only 
> disallow global sorts and it seems to be working. Anything I'm missing or is 
> it this simple?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to