Hello folks,
I have few questions:
Say I have a source like this:
final DataStream<Data> data =
env.fromSource(
source,
WatermarkStrategy.<Data>forBoundedOutOfOrderness(Duration.ofSeconds(60))
.withTimestampAssigner((event, timestamp) -> event.timestamp));
My pipeline after this is as followed:
data.flatMap(new MyFlattendData())
.keyBy(new MyKeySelector())
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.reduce(new MyReducer());
First question I have is that the timestamp I assign from the source, would
it get carried to all steps below to my window ?
Example say I have timestamped data from source as:
=> [ (10, data1), (12, data2), (59, data3), (61, data4), ... ]
would this get flattened to say:
=> [ (10, flatdata1), (12, flatdata2), (61, flatdata4), ...]
then keyed to say:
=> [ (10, [key1, flatdata1]), (12, [key1, flatdata2]), (61, [key1,
flatdata4]), ... ]
windows:
1st => [ flatdata1, flatdata2 ]
2nd => [ flatdata4, ... ]
Would the windows created before the reduce function be applied be like I
have illustrated or to have it this way, do I need to output a record at
each step with the timestamp assigned for that record ?
Basically is the timestamp assigned when reading from the source pushed
(retained) down to all the steps below when doing event time window
operation ?
Next question is in my watermark strategy: how do I set the period of the
watermarking.
Basically from An out-of-order bound B means that once an event with
timestamp T was encountered, no events older than T - B will follow any
more when the watermarking is done.
However, how frequently is watermarking done and when say watermarking, the
last encountered event was with timestamp T , does this mean watermark
timestamp would be T - B ?
How can we control the watermarking period ?
Thanks
Sachin