Hello folks,
I have few questions:

Say I have a source like this:

final DataStream<Data> data =
    env.fromSource(
        source,
        WatermarkStrategy.<Data>forBoundedOutOfOrderness(Duration.ofSeconds(60))
            .withTimestampAssigner((event, timestamp) -> event.timestamp));


My pipeline after this is as followed:

    data.flatMap(new MyFlattendData())
        .keyBy(new MyKeySelector())
        .window(TumblingEventTimeWindows.of(Time.seconds(60)))
        .reduce(new MyReducer());


First question I have is that the timestamp I assign from the source, would
it get carried to all steps below to my window ?
Example say I have timestamped data from source as:
=> [ (10, data1), (12, data2), (59, data3), (61, data4), ...  ]

 would this get flattened to say:
=> [ (10, flatdata1), (12, flatdata2), (61, flatdata4), ...]

then keyed to say:
=> [ (10, [key1, flatdata1]),   (12, [key1, flatdata2]),   (61, [key1,
flatdata4]),    ...    ]

windows:
1st => [ flatdata1, flatdata2 ]
2nd => [ flatdata4, ... ]

Would the windows created before the reduce function be applied be like I
have illustrated or to have it this way, do I need to output a record at
each step with the timestamp assigned for that record ?

Basically is the timestamp assigned when reading from the source pushed
(retained) down to all the steps below when doing event time window
operation ?


Next question is in my watermark strategy: how do I set the period of the
watermarking.
Basically from An out-of-order bound B means that once an event with
timestamp T was encountered, no events older than T - B will follow any
more when the watermarking is done.

However, how frequently is watermarking done and when say watermarking, the
last encountered event was with timestamp T , does this mean watermark
timestamp would be T - B ?

How can we control the watermarking period ?

Thanks
Sachin

Reply via email to