[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202924#comment-16202924 ] Reynold Xin commented on SPARK-20928: - OK got it - you are basically saying if we can send the offset associated with each record to the sink, then the sink can potentially implement some sort of dedup to guarantee idempotency. For most sinks this probably won't work, but if a particular sink offers a way to do it, then end-to-end exactly once can be accomplished. > Continuous Processing Mode for Structured Streaming > --- > > Key: SPARK-20928 > URL: https://issues.apache.org/jira/browse/SPARK-20928 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Michael Armbrust > Labels: SPIP > > Given the current Source API, the minimum possible latency for any record is > bounded by the amount of time that it takes to launch a task. This > limitation is a result of the fact that {{getBatch}} requires us to know both > the starting and the ending offset, before any tasks are launched. In the > worst case, the end-to-end latency is actually closer to the average batch > time + task launching time. > For applications where latency is more important than exactly-once output > however, it would be useful if processing could happen continuously. This > would allow us to achieve fully pipelined reading and writing from sources > such as Kafka. This kind of architecture would make it possible to process > records with end-to-end latencies on the order of 1 ms, rather than the > 10-100ms that is possible today. > One possible architecture here would be to change the Source API to look like > the following rough sketch: > {code} > trait Epoch { > def data: DataFrame > /** The exclusive starting position for `data`. */ > def startOffset: Offset > /** The inclusive ending position for `data`. Incrementally updated > during processing, but not complete until execution of the query plan in > `data` is finished. */ > def endOffset: Offset > } > def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], > limits: Limits): Epoch > {code} > The above would allow us to build an alternative implementation of > {{StreamExecution}} that processes continuously with much lower latency and > only stops processing when needing to reconfigure the stream (either due to a > failure or a user requested change in parallelism. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202923#comment-16202923 ] Cody Koeninger commented on SPARK-20928: If a given sink is handling a result, why does handling the corresponding offset to the result substantially increase overhead? Thinking about it in terms of a downstream database, if I'm doing a write per result, then the difference between writing (result) and writing (result, offset) seems like it should be overshadowed by the overall cost of the write. In more practical terms, poll() on the kafka consumer is returning a batch of pre-fetched messages anyway, not a single message, so one should be able to run their straight line map/filter/whatever on the batch and then commit results with the last offset. > Continuous Processing Mode for Structured Streaming > --- > > Key: SPARK-20928 > URL: https://issues.apache.org/jira/browse/SPARK-20928 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Michael Armbrust > Labels: SPIP > > Given the current Source API, the minimum possible latency for any record is > bounded by the amount of time that it takes to launch a task. This > limitation is a result of the fact that {{getBatch}} requires us to know both > the starting and the ending offset, before any tasks are launched. In the > worst case, the end-to-end latency is actually closer to the average batch > time + task launching time. > For applications where latency is more important than exactly-once output > however, it would be useful if processing could happen continuously. This > would allow us to achieve fully pipelined reading and writing from sources > such as Kafka. This kind of architecture would make it possible to process > records with end-to-end latencies on the order of 1 ms, rather than the > 10-100ms that is possible today. > One possible architecture here would be to change the Source API to look like > the following rough sketch: > {code} > trait Epoch { > def data: DataFrame > /** The exclusive starting position for `data`. */ > def startOffset: Offset > /** The inclusive ending position for `data`. Incrementally updated > during processing, but not complete until execution of the query plan in > `data` is finished. */ > def endOffset: Offset > } > def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], > limits: Limits): Epoch > {code} > The above would allow us to build an alternative implementation of > {{StreamExecution}} that processes continuously with much lower latency and > only stops processing when needing to reconfigure the stream (either due to a > failure or a user requested change in parallelism. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202906#comment-16202906 ] Reynold Xin commented on SPARK-20928: - Isn't there an issue with the overhead of tracking in the sinks? If we need to guarantee exactly once, then each record needs to be committed. > Continuous Processing Mode for Structured Streaming > --- > > Key: SPARK-20928 > URL: https://issues.apache.org/jira/browse/SPARK-20928 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Michael Armbrust > Labels: SPIP > > Given the current Source API, the minimum possible latency for any record is > bounded by the amount of time that it takes to launch a task. This > limitation is a result of the fact that {{getBatch}} requires us to know both > the starting and the ending offset, before any tasks are launched. In the > worst case, the end-to-end latency is actually closer to the average batch > time + task launching time. > For applications where latency is more important than exactly-once output > however, it would be useful if processing could happen continuously. This > would allow us to achieve fully pipelined reading and writing from sources > such as Kafka. This kind of architecture would make it possible to process > records with end-to-end latencies on the order of 1 ms, rather than the > 10-100ms that is possible today. > One possible architecture here would be to change the Source API to look like > the following rough sketch: > {code} > trait Epoch { > def data: DataFrame > /** The exclusive starting position for `data`. */ > def startOffset: Offset > /** The inclusive ending position for `data`. Incrementally updated > during processing, but not complete until execution of the query plan in > `data` is finished. */ > def endOffset: Offset > } > def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], > limits: Limits): Epoch > {code} > The above would allow us to build an alternative implementation of > {{StreamExecution}} that processes continuously with much lower latency and > only stops processing when needing to reconfigure the stream (either due to a > failure or a user requested change in parallelism. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202905#comment-16202905 ] Cody Koeninger commented on SPARK-20928: I was talking about the specific case of jobs with only narrow stages. If there's no shuffle, then it should be sufficient at any given point to record the per-partition offset alongside the result. > Continuous Processing Mode for Structured Streaming > --- > > Key: SPARK-20928 > URL: https://issues.apache.org/jira/browse/SPARK-20928 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Michael Armbrust > Labels: SPIP > > Given the current Source API, the minimum possible latency for any record is > bounded by the amount of time that it takes to launch a task. This > limitation is a result of the fact that {{getBatch}} requires us to know both > the starting and the ending offset, before any tasks are launched. In the > worst case, the end-to-end latency is actually closer to the average batch > time + task launching time. > For applications where latency is more important than exactly-once output > however, it would be useful if processing could happen continuously. This > would allow us to achieve fully pipelined reading and writing from sources > such as Kafka. This kind of architecture would make it possible to process > records with end-to-end latencies on the order of 1 ms, rather than the > 10-100ms that is possible today. > One possible architecture here would be to change the Source API to look like > the following rough sketch: > {code} > trait Epoch { > def data: DataFrame > /** The exclusive starting position for `data`. */ > def startOffset: Offset > /** The inclusive ending position for `data`. Incrementally updated > during processing, but not complete until execution of the query plan in > `data` is finished. */ > def endOffset: Offset > } > def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], > limits: Limits): Epoch > {code} > The above would allow us to build an alternative implementation of > {{StreamExecution}} that processes continuously with much lower latency and > only stops processing when needing to reconfigure the stream (either due to a > failure or a user requested change in parallelism. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202621#comment-16202621 ] Reynold Xin commented on SPARK-20928: - [~c...@koeninger.org] can you write down your thoughts on how we should maintain exactly once semantics? > Continuous Processing Mode for Structured Streaming > --- > > Key: SPARK-20928 > URL: https://issues.apache.org/jira/browse/SPARK-20928 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Michael Armbrust > Labels: SPIP > > Given the current Source API, the minimum possible latency for any record is > bounded by the amount of time that it takes to launch a task. This > limitation is a result of the fact that {{getBatch}} requires us to know both > the starting and the ending offset, before any tasks are launched. In the > worst case, the end-to-end latency is actually closer to the average batch > time + task launching time. > For applications where latency is more important than exactly-once output > however, it would be useful if processing could happen continuously. This > would allow us to achieve fully pipelined reading and writing from sources > such as Kafka. This kind of architecture would make it possible to process > records with end-to-end latencies on the order of 1 ms, rather than the > 10-100ms that is possible today. > One possible architecture here would be to change the Source API to look like > the following rough sketch: > {code} > trait Epoch { > def data: DataFrame > /** The exclusive starting position for `data`. */ > def startOffset: Offset > /** The inclusive ending position for `data`. Incrementally updated > during processing, but not complete until execution of the query plan in > `data` is finished. */ > def endOffset: Offset > } > def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], > limits: Limits): Epoch > {code} > The above would allow us to build an alternative implementation of > {{StreamExecution}} that processes continuously with much lower latency and > only stops processing when needing to reconfigure the stream (either due to a > failure or a user requested change in parallelism. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16148227#comment-16148227 ] Michael Armbrust commented on SPARK-20928: -- Hey everyone, thanks for your interest in this feature! I'm still targeting Spark 2.3, but unfortunately have been busy with other things since the summit. Will post more details on the design as soon as we have them! > Continuous Processing Mode for Structured Streaming > --- > > Key: SPARK-20928 > URL: https://issues.apache.org/jira/browse/SPARK-20928 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Michael Armbrust > > Given the current Source API, the minimum possible latency for any record is > bounded by the amount of time that it takes to launch a task. This > limitation is a result of the fact that {{getBatch}} requires us to know both > the starting and the ending offset, before any tasks are launched. In the > worst case, the end-to-end latency is actually closer to the average batch > time + task launching time. > For applications where latency is more important than exactly-once output > however, it would be useful if processing could happen continuously. This > would allow us to achieve fully pipelined reading and writing from sources > such as Kafka. This kind of architecture would make it possible to process > records with end-to-end latencies on the order of 1 ms, rather than the > 10-100ms that is possible today. > One possible architecture here would be to change the Source API to look like > the following rough sketch: > {code} > trait Epoch { > def data: DataFrame > /** The exclusive starting position for `data`. */ > def startOffset: Offset > /** The inclusive ending position for `data`. Incrementally updated > during processing, but not complete until execution of the query plan in > `data` is finished. */ > def endOffset: Offset > } > def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], > limits: Limits): Epoch > {code} > The above would allow us to build an alternative implementation of > {{StreamExecution}} that processes continuously with much lower latency and > only stops processing when needing to reconfigure the stream (either due to a > failure or a user requested change in parallelism. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16137123#comment-16137123 ] Tomasz Gawęda commented on SPARK-20928: --- [~marmbrus] Is it still planned for 2.3.0? > Continuous Processing Mode for Structured Streaming > --- > > Key: SPARK-20928 > URL: https://issues.apache.org/jira/browse/SPARK-20928 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Michael Armbrust > > Given the current Source API, the minimum possible latency for any record is > bounded by the amount of time that it takes to launch a task. This > limitation is a result of the fact that {{getBatch}} requires us to know both > the starting and the ending offset, before any tasks are launched. In the > worst case, the end-to-end latency is actually closer to the average batch > time + task launching time. > For applications where latency is more important than exactly-once output > however, it would be useful if processing could happen continuously. This > would allow us to achieve fully pipelined reading and writing from sources > such as Kafka. This kind of architecture would make it possible to process > records with end-to-end latencies on the order of 1 ms, rather than the > 10-100ms that is possible today. > One possible architecture here would be to change the Source API to look like > the following rough sketch: > {code} > trait Epoch { > def data: DataFrame > /** The exclusive starting position for `data`. */ > def startOffset: Offset > /** The inclusive ending position for `data`. Incrementally updated > during processing, but not complete until execution of the query plan in > `data` is finished. */ > def endOffset: Offset > } > def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], > limits: Limits): Epoch > {code} > The above would allow us to build an alternative implementation of > {{StreamExecution}} that processes continuously with much lower latency and > only stops processing when needing to reconfigure the stream (either due to a > failure or a user requested change in parallelism. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16080485#comment-16080485 ] Ihor Bobak commented on SPARK-20928: Dear All, on this video https://www.youtube.com/watch?v=qAZ5XUz32yM Michael told "I am switching to new mode of continuous processing". So, as I understand, it is already done, or almost done. Can anyone give a hint of how to switch on this mode? Thanks > Continuous Processing Mode for Structured Streaming > --- > > Key: SPARK-20928 > URL: https://issues.apache.org/jira/browse/SPARK-20928 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Michael Armbrust > > Given the current Source API, the minimum possible latency for any record is > bounded by the amount of time that it takes to launch a task. This > limitation is a result of the fact that {{getBatch}} requires us to know both > the starting and the ending offset, before any tasks are launched. In the > worst case, the end-to-end latency is actually closer to the average batch > time + task launching time. > For applications where latency is more important than exactly-once output > however, it would be useful if processing could happen continuously. This > would allow us to achieve fully pipelined reading and writing from sources > such as Kafka. This kind of architecture would make it possible to process > records with end-to-end latencies on the order of 1 ms, rather than the > 10-100ms that is possible today. > One possible architecture here would be to change the Source API to look like > the following rough sketch: > {code} > trait Epoch { > def data: DataFrame > /** The exclusive starting position for `data`. */ > def startOffset: Offset > /** The inclusive ending position for `data`. Incrementally updated > during processing, but not complete until execution of the query plan in > `data` is finished. */ > def endOffset: Offset > } > def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], > limits: Limits): Epoch > {code} > The above would allow us to build an alternative implementation of > {{StreamExecution}} that processes continuously with much lower latency and > only stops processing when needing to reconfigure the stream (either due to a > failure or a user requested change in parallelism. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16054690#comment-16054690 ] Cody Koeninger commented on SPARK-20928: Cool, can you label it SPIP so it shows up linked from http://spark.apache.org/improvement-proposals.html My only concern so far was the one I mentioned already, namely that it seems like you shouldn't have to give up exactly-once delivery semantics in all cases. > Continuous Processing Mode for Structured Streaming > --- > > Key: SPARK-20928 > URL: https://issues.apache.org/jira/browse/SPARK-20928 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Michael Armbrust > > Given the current Source API, the minimum possible latency for any record is > bounded by the amount of time that it takes to launch a task. This > limitation is a result of the fact that {{getBatch}} requires us to know both > the starting and the ending offset, before any tasks are launched. In the > worst case, the end-to-end latency is actually closer to the average batch > time + task launching time. > For applications where latency is more important than exactly-once output > however, it would be useful if processing could happen continuously. This > would allow us to achieve fully pipelined reading and writing from sources > such as Kafka. This kind of architecture would make it possible to process > records with end-to-end latencies on the order of 1 ms, rather than the > 10-100ms that is possible today. > One possible architecture here would be to change the Source API to look like > the following rough sketch: > {code} > trait Epoch { > def data: DataFrame > /** The exclusive starting position for `data`. */ > def startOffset: Offset > /** The inclusive ending position for `data`. Incrementally updated > during processing, but not complete until execution of the query plan in > `data` is finished. */ > def endOffset: Offset > } > def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], > limits: Limits): Epoch > {code} > The above would allow us to build an alternative implementation of > {{StreamExecution}} that processes continuously with much lower latency and > only stops processing when needing to reconfigure the stream (either due to a > failure or a user requested change in parallelism. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16054596#comment-16054596 ] Michael Armbrust commented on SPARK-20928: -- Hi Cody, I do plan to flesh this out with the other sections of the SIP document and will email the dev list at that point. All that has been done so far is some basic prototyping to estimate how much work an alternative {{StreamExecution}} would take to build, and some experiments to validate the latencies that this arch could achieve. Do you have specific concerns with the proposal as it stands? > Continuous Processing Mode for Structured Streaming > --- > > Key: SPARK-20928 > URL: https://issues.apache.org/jira/browse/SPARK-20928 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Michael Armbrust > > Given the current Source API, the minimum possible latency for any record is > bounded by the amount of time that it takes to launch a task. This > limitation is a result of the fact that {{getBatch}} requires us to know both > the starting and the ending offset, before any tasks are launched. In the > worst case, the end-to-end latency is actually closer to the average batch > time + task launching time. > For applications where latency is more important than exactly-once output > however, it would be useful if processing could happen continuously. This > would allow us to achieve fully pipelined reading and writing from sources > such as Kafka. This kind of architecture would make it possible to process > records with end-to-end latencies on the order of 1 ms, rather than the > 10-100ms that is possible today. > One possible architecture here would be to change the Source API to look like > the following rough sketch: > {code} > trait Epoch { > def data: DataFrame > /** The exclusive starting position for `data`. */ > def startOffset: Offset > /** The inclusive ending position for `data`. Incrementally updated > during processing, but not complete until execution of the query plan in > `data` is finished. */ > def endOffset: Offset > } > def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], > limits: Limits): Epoch > {code} > The above would allow us to build an alternative implementation of > {{StreamExecution}} that processes continuously with much lower latency and > only stops processing when needing to reconfigure the stream (either due to a > failure or a user requested change in parallelism. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16052831#comment-16052831 ] Cody Koeninger commented on SPARK-20928: This needs an improvement proposal. Based on discussions on the mailing list and representations made at Spark Summit, this is already a "work in progress". If that's the case, it needs community involvement now, not later, to reduce the danger of Databricks developing something that meets their internal needs but not community needs. > Continuous Processing Mode for Structured Streaming > --- > > Key: SPARK-20928 > URL: https://issues.apache.org/jira/browse/SPARK-20928 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Michael Armbrust > > Given the current Source API, the minimum possible latency for any record is > bounded by the amount of time that it takes to launch a task. This > limitation is a result of the fact that {{getBatch}} requires us to know both > the starting and the ending offset, before any tasks are launched. In the > worst case, the end-to-end latency is actually closer to the average batch > time + task launching time. > For applications where latency is more important than exactly-once output > however, it would be useful if processing could happen continuously. This > would allow us to achieve fully pipelined reading and writing from sources > such as Kafka. This kind of architecture would make it possible to process > records with end-to-end latencies on the order of 1 ms, rather than the > 10-100ms that is possible today. > One possible architecture here would be to change the Source API to look like > the following rough sketch: > {code} > trait Epoch { > def data: DataFrame > /** The exclusive starting position for `data`. */ > def startOffset: Offset > /** The inclusive ending position for `data`. Incrementally updated > during processing, but not complete until execution of the query plan in > `data` is finished. */ > def endOffset: Offset > } > def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], > limits: Limits): Epoch > {code} > The above would allow us to build an alternative implementation of > {{StreamExecution}} that processes continuously with much lower latency and > only stops processing when needing to reconfigure the stream (either due to a > failure or a user requested change in parallelism. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16040516#comment-16040516 ] Wladimir Schmidt commented on SPARK-20928: -- I am really looking forward for it! This sub millisecond streaming is really exciting. > Continuous Processing Mode for Structured Streaming > --- > > Key: SPARK-20928 > URL: https://issues.apache.org/jira/browse/SPARK-20928 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Michael Armbrust > > Given the current Source API, the minimum possible latency for any record is > bounded by the amount of time that it takes to launch a task. This > limitation is a result of the fact that {{getBatch}} requires us to know both > the starting and the ending offset, before any tasks are launched. In the > worst case, the end-to-end latency is actually closer to the average batch > time + task launching time. > For applications where latency is more important than exactly-once output > however, it would be useful if processing could happen continuously. This > would allow us to achieve fully pipelined reading and writing from sources > such as Kafka. This kind of architecture would make it possible to process > records with end-to-end latencies on the order of 1 ms, rather than the > 10-100ms that is possible today. > One possible architecture here would be to change the Source API to look like > the following rough sketch: > {code} > trait Epoch { > def data: DataFrame > /** The exclusive starting position for `data`. */ > def startOffset: Offset > /** The inclusive ending position for `data`. Incrementally updated > during processing, but not complete until execution of the query plan in > `data` is finished. */ > def endOffset: Offset > } > def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], > limits: Limits): Epoch > {code} > The above would allow us to build an alternative implementation of > {{StreamExecution}} that processes continuously with much lower latency and > only stops processing when needing to reconfigure the stream (either due to a > failure or a user requested change in parallelism. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16036157#comment-16036157 ] Nan Zhu commented on SPARK-20928: - if I understand correctly the tasks will be "long-term" tasks just like the receiver tasks in receiver-based InputDStream in Spark Streaming? > Continuous Processing Mode for Structured Streaming > --- > > Key: SPARK-20928 > URL: https://issues.apache.org/jira/browse/SPARK-20928 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Michael Armbrust > > Given the current Source API, the minimum possible latency for any record is > bounded by the amount of time that it takes to launch a task. This > limitation is a result of the fact that {{getBatch}} requires us to know both > the starting and the ending offset, before any tasks are launched. In the > worst case, the end-to-end latency is actually closer to the average batch > time + task launching time. > For applications where latency is more important than exactly-once output > however, it would be useful if processing could happen continuously. This > would allow us to achieve fully pipelined reading and writing from sources > such as Kafka. This kind of architecture would make it possible to process > records with end-to-end latencies on the order of 1 ms, rather than the > 10-100ms that is possible today. > One possible architecture here would be to change the Source API to look like > the following rough sketch: > {code} > trait Epoch { > def data: DataFrame > /** The exclusive starting position for `data`. */ > def startOffset: Offset > /** The inclusive ending position for `data`. Incrementally updated > during processing, but not complete until execution of the query plan in > `data` is finished. */ > def endOffset: Offset > } > def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], > limits: Limits): Epoch > {code} > The above would allow us to build an alternative implementation of > {{StreamExecution}} that processes continuously with much lower latency and > only stops processing when needing to reconfigure the stream (either due to a > failure or a user requested change in parallelism. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035964#comment-16035964 ] Cody Koeninger commented on SPARK-20928: For jobs that only have narrow stages, I think it should be possible to maintain delivery semantics as well. > Continuous Processing Mode for Structured Streaming > --- > > Key: SPARK-20928 > URL: https://issues.apache.org/jira/browse/SPARK-20928 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Michael Armbrust > > Given the current Source API, the minimum possible latency for any record is > bounded by the amount of time that it takes to launch a task. This > limitation is a result of the fact that {{getBatch}} requires us to know both > the starting and the ending offset, before any tasks are launched. In the > worst case, the end-to-end latency is actually closer to the average batch > time + task launching time. > For applications where latency is more important than exactly-once output > however, it would be useful if processing could happen continuously. This > would allow us to achieve fully pipelined reading and writing from sources > such as Kafka. This kind of architecture would make it possible to process > records with end-to-end latencies on the order of 1 ms, rather than the > 10-100ms that is possible today. > One possible architecture here would be to change the Source API to look like > the following rough sketch: > {code} > trait Epoch { > def data: DataFrame > /** The exclusive starting position for `data`. */ > def startOffset: Offset > /** The inclusive ending position for `data`. Incrementally updated > during processing, but not complete until execution of the query plan in > `data` is finished. */ > def endOffset: Offset > } > def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], > limits: Limits): Epoch > {code} > The above would allow us to build an alternative implementation of > {{StreamExecution}} that processes continuously with much lower latency and > only stops processing when needing to reconfigure the stream (either due to a > failure or a user requested change in parallelism. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16030379#comment-16030379 ] Nan Zhu commented on SPARK-20928: - Hi, is there any description on what does it mean? > Continuous Processing Mode for Structured Streaming > --- > > Key: SPARK-20928 > URL: https://issues.apache.org/jira/browse/SPARK-20928 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Michael Armbrust > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org