[
https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220694#comment-16220694
]
Cody Koeninger commented on SPARK-20928:
----------------------------------------
Can you clarify how this impacts sinks having access to the underlying kafka
offsets, e.g. https://issues.apache.org/jira/browse/SPARK-18258
> SPIP: Continuous Processing Mode for Structured Streaming
> ---------------------------------------------------------
>
> Key: SPARK-20928
> URL: https://issues.apache.org/jira/browse/SPARK-20928
> Project: Spark
> Issue Type: Improvement
> Components: Structured Streaming
> Affects Versions: 2.2.0
> Reporter: Michael Armbrust
> Labels: SPIP
> Attachments: Continuous Processing in Structured Streaming Design
> Sketch.pdf
>
>
> Given the current Source API, the minimum possible latency for any record is
> bounded by the amount of time that it takes to launch a task. This
> limitation is a result of the fact that {{getBatch}} requires us to know both
> the starting and the ending offset, before any tasks are launched. In the
> worst case, the end-to-end latency is actually closer to the average batch
> time + task launching time.
> For applications where latency is more important than exactly-once output
> however, it would be useful if processing could happen continuously. This
> would allow us to achieve fully pipelined reading and writing from sources
> such as Kafka. This kind of architecture would make it possible to process
> records with end-to-end latencies on the order of 1 ms, rather than the
> 10-100ms that is possible today.
> One possible architecture here would be to change the Source API to look like
> the following rough sketch:
> {code}
> trait Epoch {
> def data: DataFrame
> /** The exclusive starting position for `data`. */
> def startOffset: Offset
> /** The inclusive ending position for `data`. Incrementally updated
> during processing, but not complete until execution of the query plan in
> `data` is finished. */
> def endOffset: Offset
> }
> def getBatch(startOffset: Option[Offset], endOffset: Option[Offset],
> limits: Limits): Epoch
> {code}
> The above would allow us to build an alternative implementation of
> {{StreamExecution}} that processes continuously with much lower latency and
> only stops processing when needing to reconfigure the stream (either due to a
> failure or a user requested change in parallelism.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]