Hi Anastasios

On 5/14/19 4:15 PM, Anastasios Zouzias wrote:
> Hi Joe,
> 
> How often do you trigger your mini-batch? Maybe you can specify the trigger 
> time explicitly to a low value or even better set it off.
> 
> See: 
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers

I tried different values for the trigger, and settled on 10 seconds. I can see 
in the logs that this actually works (it outputs a mini-batch summary in the 
log every 10 seconds).

There in these log entries I also see that the watermark does not progress, if 
no new data is coming in. This is how I came to my suspsicion on how it works 
internally.

I understand that it is quite uncommon to have such "slowly moving topics", but 
unfortunately in my use case I have them.

> On Tue, May 14, 2019 at 3:49 PM Joe Ammann <j...@pyx.ch <mailto:j...@pyx.ch>> 
> wrote:
> 
>     Hi all
> 
>     I'm fairly new to Spark structured streaming and I'm only starting to 
> develop an understanding for the watermark handling.
> 
>     Our application reads data from a Kafka input topic and as one of the 
> first steps, it has to group incoming messages. Those messages come in bulks, 
> e.g. 5 messages which belong to the same "business event" (share a common 
> key), with event timestamps differing in only a few millisecs. And then no 
> messages for say 3 minutes. And after that another bulk of 3 messages with 
> very close event timestamps.
> 
>     I have set a watermark of 20 seconds on my streaming query, and a groupBy 
> on the shared common key, and a window of 20 seconds (10 seconds sliding). So 
> something like
> 
>         df = inputStream.withWatermark("eventtime", "20 
> seconds").groupBy("sharedId", window("20 seconds", "10 seconds")
> 
>     The output mode is set to append, since I intend to join this streams 
> with other streams later in the application.
> 
>     Naively, I would have expected to see any incoming bulk of messages as an 
> aggregated message ~20 seconds after it's eventtime on the output stream. But 
> my observations indicate that the "latest bulk of events" always stays queued 
> inside the query, until a new bulk of events arrive and bump up the 
> watermark. In my example above, this means that I see the first bulk of 
> events only after 3 minutes, when the second bulk comes in.
> 
>     This does indeed make some sense, and if I understand the documentation 
> correctly the watermark is only ever updated upon arrival of new inputs. The 
> "real time" does not play a role in the setting of watermarks.
> 
>     But to me this means that any bulk of events is prohibited from being 
> sent downstreams until a new bulk comes in. This is not what I intended.
> 
>     Is my understanding more or less correct? And is there any way of 
> bringing "the real time" into the calculation of the watermark (short of 
> producing regular dummy messages which are then again filtered out).
> -- 
> -- Anastasios Zouzias
> <mailto:a...@zurich.ibm.com>


-- 
CU, Joe

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to