[ https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16245556#comment-16245556 ]
Li Yuanjian commented on SPARK-20928: ------------------------------------- Our team discuss on the design sketch in detail, we have some ideas and questions take down below. 1. Will the Window Operation support in the Continuous Processing Mode? Even if we only consider narrow dependencies currently like the design sketch described, the exactly-once assurance may not be accomplished based on current implementation of window and watermark. 2. Should the EpochIDs aligned in the scenario of not map-only? {quote} The design can also work with blocking operators, although it’d require the blocking operators to ensure epoch markers from all the partitions have been received by the operator before moving forward to commit. {quote} is the `blocking operators` means 'operator need shuffle'? We think that only the operator has ordering relation(like window\mapState\sortByKey) need the EpochIDs aligned, others(like groupBy) doesn't. 3. Also the scenario of many to one(like shuffle and window), should we use a new EpochID in shuffle read stage and window slide out trigger, or use the original EpochIDs batch? > SPIP: Continuous Processing Mode for Structured Streaming > --------------------------------------------------------- > > Key: SPARK-20928 > URL: https://issues.apache.org/jira/browse/SPARK-20928 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming > Affects Versions: 2.2.0 > Reporter: Michael Armbrust > Labels: SPIP > Attachments: Continuous Processing in Structured Streaming Design > Sketch.pdf > > > Given the current Source API, the minimum possible latency for any record is > bounded by the amount of time that it takes to launch a task. This > limitation is a result of the fact that {{getBatch}} requires us to know both > the starting and the ending offset, before any tasks are launched. In the > worst case, the end-to-end latency is actually closer to the average batch > time + task launching time. > For applications where latency is more important than exactly-once output > however, it would be useful if processing could happen continuously. This > would allow us to achieve fully pipelined reading and writing from sources > such as Kafka. This kind of architecture would make it possible to process > records with end-to-end latencies on the order of 1 ms, rather than the > 10-100ms that is possible today. > One possible architecture here would be to change the Source API to look like > the following rough sketch: > {code} > trait Epoch { > def data: DataFrame > /** The exclusive starting position for `data`. */ > def startOffset: Offset > /** The inclusive ending position for `data`. Incrementally updated > during processing, but not complete until execution of the query plan in > `data` is finished. */ > def endOffset: Offset > } > def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], > limits: Limits): Epoch > {code} > The above would allow us to build an alternative implementation of > {{StreamExecution}} that processes continuously with much lower latency and > only stops processing when needing to reconfigure the stream (either due to a > failure or a user requested change in parallelism. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org