[ 
https://issues.apache.org/jira/browse/SPARK-41942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17655891#comment-17655891
 ] 

Jungtaek Lim commented on SPARK-41942:
--------------------------------------

I see the opportunity on enabling bidirectional information flow between driver 
and executor for source. Do you have a solid design in your mind? One thing we 
need to remind is that the offset range of the microbatch should be still 
planned from the driver side and persisted to WAL before going through 
execution of microbatch. This is a baseline of the fault guarantee model.

I don't see how this can help allowing non-offset based systems though.

> Current microbatch model is insufficient to efficiently handle partitioned 
> logs with offsets
> --------------------------------------------------------------------------------------------
>
>                 Key: SPARK-41942
>                 URL: https://issues.apache.org/jira/browse/SPARK-41942
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 3.2.3
>            Reporter: Daniel Collins
>            Priority: Major
>
> Spark microbatch only allows unidirectional flow of information from the 
> driver to the worker, which has multiple negative effects for partition and 
> offset based sources.
> If you have large gaps in the offsets between the current offset and the next 
> message (such as when there is garbage collection, compaction or a 
> newly-created subscription with a 0 offset seek point), the driver will need 
> to work through all of the empty ranges, despite the worker knowing each time 
> that the next message it will receive is at a much later offset.
> In addition, because the driver can only specify a range of work in terms of 
> offsets, the size of the range selected must be highly tuned to the size of 
> messages in the backlog- when the size changes or is variable, there can be 
> no correct selection that both allows high throughput and doesn't overload 
> the driver when messages are large.
> To resolve this, there needs to be a channel to communicate information about 
> what messages were processed back to the driver code. This would allow simply 
> resolving both of the above issues by 1) sending the offset of the last read 
> message back to the driver and 2) structuring reads as <start offset, head 
> offset, byte limit> instead of <start offset, end offset>, which would allow 
> repeatable reads that also can respond to messages of different sizes by 
> making different sized batches, and would allow skipping large offset gaps.
> This would have the added advantage of allowing non-offset based systems 
> (such as google Pub/Sub) to implement the microbatch API, by propagating 
> information about which messages were read back to the driver code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to