Burak, thanks for the resources. I was thinking that the trigger interval and the sliding window were the same thing, but now I am confused. I didn't know there was a .trigger() method, since the official Programming Guide <https://spark.apache.org/docs/2.1.1/structured-streaming-programming-guide.html> doesn't even mention it(!)
Calling .trigger(ProcessingTime("1 minute")) made the program generate output at most once each minute, instead of 2-3 times each minute. However, I'm still unable to understand what my program is outputting. For instance, in a single triggering, this is part of the output in the same partition/version (from ~1000 items): ... (15:44:29, 15:44:30) 275 (15:44:30, 15:44:30) 259 (15:44:30, 15:44:30) 261 ... - Why is there *multiple outputs for the same window*? - Why *almost all* windows have a zero length (begin and end timestamps are equal)? Additionally, what is the use of sliding window? Thanks, Eduardo 2017-09-11 13:11 GMT-03:00 Burak Yavuz <brk...@gmail.com>: > Hi Eduardo, > > What you have written out is to output counts "as fast as possible" for > windows of 5 minute length and with a sliding window of 1 minute. So for a > record at 10:13, you would get that record included in the count for > 10:09-10:14, 10:10-10:15, 10:11-10:16, 10:12-10:16, 10:13-10:18. > > Please take a look at the following blog post for more details: > https://databricks.com/blog/2017/05/08/event-time- > aggregation-watermarking-apache-sparks-structured-streaming.html > Also this talk can be helpful: > https://www.youtube.com/watch?v=JAb4FIheP28&t=942s (especially after 19th > minute) > > What you seem to be looking for is "Update" output mode (you may need > Spark 2.2 for this IIRC), with a window duration of 5 minutes and no > sliding interval, and a processing time trigger of 1 minute. Note that this > still doesn't guarantee 1 output row every trigger as late data may arrive > (unless you set the watermark accordingly). > > > Best, > Burak > > > On Mon, Sep 11, 2017 at 8:04 AM, Eduardo D'Avila < > eduardo.dav...@corp.globo.com> wrote: > >> Hi, >> >> I'm trying to use Spark 2.1.1 structured streaming to *count the number >> of records* from Kafka *for each time window* with the code in this >> GitHub gist >> <https://gist.github.com/erdavila/b6ab0c216e82ae77fa8192c48cb816e4>. >> >> I expected that, *once each minute* (the slide duration), it would *output >> a single record* (since the only aggregation key is the window) with the >> *record >> count for the last 5 minutes* (the window duration). However, it outputs >> several records 2-3 times per minute, like in the sample output included in >> the gist. >> >> Changing the output mode to "append" seems to change the behavior, but >> still far from what I expected. >> >> What is wrong with my assumptions on the way it should work? Given the >> code, how should the sample output be interpreted or used? >> >> Thanks, >> >> Eduardo >> > >