Re: [SS] Why is a streaming aggregation required for complete output mode?

2017-08-22 Thread Jacek Laskowski
Hi, Thanks a lot Burak for the explanation! I appreciate it a lot (and promise to share the great news far and wide once I get the gist of the internals myself) What I miss is what part of Structured Streaming is responsible for enforcing the semantics of output modes. Once defined for a

Re: [SS] Why is a streaming aggregation required for complete output mode?

2017-08-18 Thread Burak Yavuz
Hi Jacek, The way the memory sink is architected at the moment is that it either appends a row (append/update mode) or replaces all rows (complete mode). When a user specifies a checkpoint location, the guarantee Structured Streaming provides is that output sinks will not lose data and will be

Re: [SS] Why is a streaming aggregation required for complete output mode?

2017-08-18 Thread Jacek Laskowski
Hi, This is what I could find in Spark's source code about the `recoverFromCheckpointLocation` flag (that led me to explore the complete output mode for dropDuplicates operator). `recoverFromCheckpointLocation` flag is enabled by default and varies per sink (memory, console and others). *

Re: [SS] Why is a streaming aggregation required for complete output mode?

2017-08-18 Thread Holden Karau
My assumption is it would be similar though, in memory sink of all of your records would quickly overwhelm your cluster, but in aggregation it could be reasonable. But there might be additional reasons on top of that. On Fri, Aug 18, 2017 at 11:44 AM Holden Karau wrote: >

Re: [SS] Why is a streaming aggregation required for complete output mode?

2017-08-18 Thread Holden Karau
Ah yes I'm not sure about the workings of the memory sink. On Fri, Aug 18, 2017 at 11:36 AM, Jacek Laskowski wrote: > Hi Holden, > > Thanks a lot for a bit more light on the topic. That however does not > explain why memory sink requires Complete for a checkpoint location to >

Re: [SS] Why is a streaming aggregation required for complete output mode?

2017-08-18 Thread Jacek Laskowski
Hi Holden, Thanks a lot for a bit more light on the topic. That however does not explain why memory sink requires Complete for a checkpoint location to work. The only reason I used Complete output mode was to meet the requirements of memory sink and that got me thinking why would the

Re: [SS] Why is a streaming aggregation required for complete output mode?

2017-08-18 Thread Holden Karau
So performing complete output without an aggregation would require building up a table of the entire input to write out at each micro batch. This would get prohibitively expensive quickly. With an aggregation we just need to keep track of the aggregates and update them every batch, so the memory

[SS] Why is a streaming aggregation required for complete output mode?

2017-08-18 Thread Jacek Laskowski
Hi, Why is the requirement for a streaming aggregation in a streaming query? What would happen if Spark allowed Complete without a single aggregation? This is the latest master. scala> val q = ids. | writeStream. | format("memory"). | queryName("dups"). |