[ https://issues.apache.org/jira/browse/SPARK-31376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17077819#comment-17077819 ]
Jungtaek Lim commented on SPARK-31376: -------------------------------------- I'm saying that sort is simply unavailable operation for streaming workload, in theory. Please don't put aside the concept of streaming - input is "unbounded". It doesn't make any difference from partition vs global, as even for a single partition it doesn't make sense to sort given the input is unbounded. If we allow sort within the micro-batch, the result will be quite different even for the same streaming query how batches are constructed, which is not what "Structured Streaming" proposes I think. (That's why the concept of "state" has to be introduced.) If we simply support "operation within micro-batch" then global sort wouldn't matter as well. The meaning of "same result" is the "output rows" Spark writes to the sink. It doesn't mean to be any physical kind of guarantees, like order, partitions, etc. From the viewpoint the output would be same between batch query and streaming query, and the operation is defined as "not supported" in structured streaming if it's impossible to do it in streaming query. You'll get no chance to even try to sort if you play with record-to-record streaming framework. It's only possible with "micro-batch" manner, as "micro-batch" does have "batch" characteristic. You can leverage the benefits via using foreachBatch - it's better than supporting this natively as you would recognize that your operations are no longer tied to the streaming semantic when you leverage foreachBatch. Supporting this natively leads confusion on reasoning about the behavior. > Also, I don't quite understand how repartitioning is valid on a streaming > query, I was surprised when that worked. That seems more questionable than > local sorting. I'm not sure what's the reason you reach the conclusion. It's pretty natural that repartitioning is valid on a streaming query - shuffle has been provided from the early generation of streaming frameworks. There're only the differences on how to handle shuffling - push based vs pull based. If you play with record-to-record streaming framework then it's likely push based (as each output row should be immediately sent to the downstream), whereas with batch oriented streaming framework it's likely pull based (can be push based though). One more thing, the behavior of the operation must be consistent across data source - if something makes sense for file data source but not making sense for kafka data source, the operation cannot be supported natively. > Non-global sort support for structured streaming > ------------------------------------------------ > > Key: SPARK-31376 > URL: https://issues.apache.org/jira/browse/SPARK-31376 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming > Affects Versions: 3.1.0 > Reporter: Adam Binford > Priority: Minor > > Currently, all sorting is disallowed with structured streaming queries. Not > allowing global sorting makes sense, but could non-global sorting (i.e. > sortWithinPartitions) be allowed? I'm running into this with an external > source I'm using, but not sure if this would be useful to file sources as > well. I have to foreachBatch so that I can do a sortWithinPartitions. > Two main questions: > * Does a local sort cause issues with any exactly-once guarantees streaming > queries provides? I can't say I know or understand how these semantics work. > Or are there other issues I can't think of this would cause? > * Is the change as simple as changing the unsupported operations check to > only look for global sorts instead of all sorts? > I have built a version that simply changes the unsupported check to only > disallow global sorts and it seems to be working. Anything I'm missing or is > it this simple? -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org