Hi Anastasios On 5/14/19 4:15 PM, Anastasios Zouzias wrote: > Hi Joe, > > How often do you trigger your mini-batch? Maybe you can specify the trigger > time explicitly to a low value or even better set it off. > > See: > https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers
I tried different values for the trigger, and settled on 10 seconds. I can see in the logs that this actually works (it outputs a mini-batch summary in the log every 10 seconds). There in these log entries I also see that the watermark does not progress, if no new data is coming in. This is how I came to my suspsicion on how it works internally. I understand that it is quite uncommon to have such "slowly moving topics", but unfortunately in my use case I have them. > On Tue, May 14, 2019 at 3:49 PM Joe Ammann <j...@pyx.ch <mailto:j...@pyx.ch>> > wrote: > > Hi all > > I'm fairly new to Spark structured streaming and I'm only starting to > develop an understanding for the watermark handling. > > Our application reads data from a Kafka input topic and as one of the > first steps, it has to group incoming messages. Those messages come in bulks, > e.g. 5 messages which belong to the same "business event" (share a common > key), with event timestamps differing in only a few millisecs. And then no > messages for say 3 minutes. And after that another bulk of 3 messages with > very close event timestamps. > > I have set a watermark of 20 seconds on my streaming query, and a groupBy > on the shared common key, and a window of 20 seconds (10 seconds sliding). So > something like > > df = inputStream.withWatermark("eventtime", "20 > seconds").groupBy("sharedId", window("20 seconds", "10 seconds") > > The output mode is set to append, since I intend to join this streams > with other streams later in the application. > > Naively, I would have expected to see any incoming bulk of messages as an > aggregated message ~20 seconds after it's eventtime on the output stream. But > my observations indicate that the "latest bulk of events" always stays queued > inside the query, until a new bulk of events arrive and bump up the > watermark. In my example above, this means that I see the first bulk of > events only after 3 minutes, when the second bulk comes in. > > This does indeed make some sense, and if I understand the documentation > correctly the watermark is only ever updated upon arrival of new inputs. The > "real time" does not play a role in the setting of watermarks. > > But to me this means that any bulk of events is prohibited from being > sent downstreams until a new bulk comes in. This is not what I intended. > > Is my understanding more or less correct? And is there any way of > bringing "the real time" into the calculation of the watermark (short of > producing regular dummy messages which are then again filtered out). > -- > -- Anastasios Zouzias > <mailto:a...@zurich.ibm.com> -- CU, Joe --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org