Hi all

I'm fairly new to Spark structured streaming and I'm only starting to develop 
an understanding for the watermark handling.

Our application reads data from a Kafka input topic and as one of the first 
steps, it has to group incoming messages. Those messages come in bulks, e.g. 5 
messages which belong to the same "business event" (share a common key), with 
event timestamps differing in only a few millisecs. And then no messages for 
say 3 minutes. And after that another bulk of 3 messages with very close event 
timestamps.

I have set a watermark of 20 seconds on my streaming query, and a groupBy on 
the shared common key, and a window of 20 seconds (10 seconds sliding). So 
something like

    df = inputStream.withWatermark("eventtime", "20 
seconds").groupBy("sharedId", window("20 seconds", "10 seconds")

The output mode is set to append, since I intend to join this streams with 
other streams later in the application.

Naively, I would have expected to see any incoming bulk of messages as an 
aggregated message ~20 seconds after it's eventtime on the output stream. But 
my observations indicate that the "latest bulk of events" always stays queued 
inside the query, until a new bulk of events arrive and bump up the watermark. 
In my example above, this means that I see the first bulk of events only after 
3 minutes, when the second bulk comes in.

This does indeed make some sense, and if I understand the documentation 
correctly the watermark is only ever updated upon arrival of new inputs. The 
"real time" does not play a role in the setting of watermarks.

But to me this means that any bulk of events is prohibited from being sent 
downstreams until a new bulk comes in. This is not what I intended.

Is my understanding more or less correct? And is there any way of bringing "the 
real time" into the calculation of the watermark (short of producing regular 
dummy messages which are then again filtered out).

-- 
CU, Joe

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to