Hi,

Watermarks are meta events that travel independently of data events.

1) If you assingTimestampsAndWatermarks before keyBy, all parallel
instances of trips have some data(this is my assumption) so Watermarks
can be generated. Afterwards even if some of the keyed partitions have
no data, Watermarks are broadcasted/forwarded anyway. In other words if
at some point Watermarks were generated for all partitions of a single
stage, they will be forwarded beyond this point.

2) If you assingTimestampsAndWatermarks after keyBy, you try to assign
watermarks for an empty partition which produces no Watermarks at all
for this partition, therefore there is no progress beyond this point.

I hope this clarifies it a bit.

Best,

Dawid

On 25/04/2019 16:49, an0 wrote:
> If my understanding is correct, then why `assignTimestampsAndWatermarks` 
> before `keyBy` works? The `timeWindowAll` stream's input streams are task 1 
> and task 2, with task 2 idling, no matter whether 
> `assignTimestampsAndWatermarks` is before or after `keyBy`, because whether 
> task 2 receives elements only depends on the key distribution, has nothing to 
> do with timestamp assignment, right?
>
>                                                                               
>           /key 1 trips\
>                                                                               
>         /                    \  
> (A) trips--> assignTimestampsAndWatermarks-->keyBy                    
> timeWindowAll
>                                                                               
>         \       idle        /
>                                                                               
>           \key 2 trips/
>
>                            /key 1 trips--> assignTimestampsAndWatermarks\
>                          /                                                    
>                              \  
> (B) trips-->keyBy                                                             
>                     timeWindowAll
>                          \       idle                                         
>                             /
>                            \key 2 trips--> assignTimestampsAndWatermarks/
>
> How things are different between A and B from `timeWindowAll`'s perspective?
>
> BTW, thanks for the webinar link, I'll check it later.
>
> On 2019/04/25 08:30:20, Dawid Wysakowicz <dwysakow...@apache.org> wrote: 
>> Hi,
>>
>> Yes I think your explanation is correct. I can also recommend Seth's
>> webinar where he talks about debugging Watermarks[1]
>>
>> Best,
>>
>> Dawid
>>
>> [1]
>> https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial
>>
>> On 22/04/2019 22:55, an0 wrote:
>>> Thanks, I feel I'm getting closer to the truth. 
>>>
>>> So parallelism is the cause? Say my parallelism is 2. Does that mean I get 
>>> 2 tasks running after `keyBy` if even all elements have the same key so go 
>>> to 1 down stream(say task 1)? And it is the other task(task 2) with no 
>>> incoming data that caused the `timeWindowAll` stream unable to progress? 
>>> Because both task 1 and task 2 are its input streams and one is idling so 
>>> its event time cannot make progress?
>>>
>>> On 2019/04/22 01:57:39, Guowei Ma <guowei....@gmail.com> wrote: 
>>>> HI,
>>>>
>>>> BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it
>>>> receives an element.
>>>>
>>>> For after Keyby:
>>>> Flink uses the HashCode of key and the parallelism of down stream to decide
>>>> which subtask would receive the element. This means if your key is always
>>>> same, all the sources will only send the elements to the same down stream
>>>> task, for example only no. 3 BoundedOutOfOrdernessTimestampExtractor.
>>>>
>>>> For before Keyby:
>>>> In your case, the Source and BoundedOutOfOrdernessTimestampExtractors would
>>>> be chained together, which means every
>>>> BoundedOutOfOrdernessTimestampExtractors will receive elements.
>>>>
>>>> Best,
>>>> Guowei
>>>>
>>>>
>>>> an0 <an0...@gmail.com> 于2019年4月19日周五 下午10:41写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> First of all, thank you for the `shuffle()` tip. It works. However, I
>>>>> still don't understand why it doesn't work without calling `shuffle()`.
>>>>>
>>>>> Why would not all BoundedOutOfOrdernessTimestampExtractors receive trips?
>>>>> All the trips has keys and timestamps. As I said in my reply to Paul, I 
>>>>> see
>>>>> the same watermarks being extracted.
>>>>>
>>>>> How could calling `assignTimestampsAndWatermarks` before VS after `keyBy`
>>>>> matter? My understanding is any specific window for a specific key always
>>>>> receives the exactly same data, and the calling order of
>>>>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that.
>>>>>
>>>>> To make `keyBy` as irrelevant as possible, I tried letting it always
>>>>> return the same key so that there is only 1 keyed stream and it is exactly
>>>>> the same as the original unkeyed stream. It still doesn't trigger windows:
>>>>> ```java
>>>>> DataStream<Trip> trips = env.addSource(consumer);
>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L);
>>>>> DataStream<Trip> featurizedUserTrips =
>>>>>         userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
>>>>>     @Override
>>>>>     public long extractTimestamp(Trip trip) {
>>>>>         return trip.endTime.getTime();
>>>>>     }
>>>>> });
>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
>>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
>>>>>         Time.days(1));
>>>>> ```
>>>>>
>>>>> It makes no sense to me. Please help me understand why it doesn't work.
>>>>> Thanks!
>>>>>
>>>>> On 2019/04/19 04:14:31, Guowei Ma <guowei....@gmail.com> wrote:
>>>>>> Hi,
>>>>>> After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors
>>>>>> could receive the elements(trip). If that is the case
>>>>>> BoundedOutOfOrdernessTimestampExtractor, which does not receive element
>>>>>> would not send the WM. Since that the timeWindowAll operator could not be
>>>>>> triggered.
>>>>>> You could add a shuffle() before the assignTimestampsAndWatermarks in
>>>>> your
>>>>>> second case and check if the window is triggered.  If it could be
>>>>> triggered
>>>>>> you could check the distribution of elements generated by the source.
>>>>>>
>>>>>> Best,
>>>>>> Guowei
>>>>>>
>>>>>>
>>>>>> an0...@gmail.com <an0...@gmail.com> 于2019年4月19日周五 上午4:10写道:
>>>>>>
>>>>>>> I don't think it is the watermark. I see the same watermarks from the
>>>>> two
>>>>>>> versions of code.
>>>>>>>
>>>>>>> The processing on the keyed stream doesn't change event time at all. I
>>>>> can
>>>>>>> simply change my code to use `map` on the keyed stream to return back
>>>>> the
>>>>>>> input data, so that the window operator receives the exactly same
>>>>> data. The
>>>>>>> only difference is when I do `assignTimestampsAndWatermarks`. The
>>>>> result is
>>>>>>> the same, `assignTimestampsAndWatermarks` before `keyBy` works:
>>>>>>> ```java
>>>>>>> DataStream<Trip> trips =
>>>>>>>         env.addSource(consumer).assignTimestampsAndWatermarks(new
>>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
>>>>>>>     @Override
>>>>>>>     public long extractTimestamp(Trip trip) {
>>>>>>>         return trip.endTime.getTime();
>>>>>>>     }
>>>>>>> });
>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
>>>>>>> DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip);
>>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
>>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
>>>>>>>         Time.days(1));
>>>>>>> ```
>>>>>>>
>>>>>>> `assignTimestampsAndWatermarks` after `keyBy` doesn't work:
>>>>>>> ```java
>>>>>>> DataStream<Trip> trips = env.addSource(consumer);
>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
>>>>>>> DataStream<Trip> featurizedUserTrips =
>>>>>>>         userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
>>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
>>>>>>>     @Override
>>>>>>>     public long extractTimestamp(Trip trip) {
>>>>>>>         return trip.endTime.getTime();
>>>>>>>     }
>>>>>>> });
>>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
>>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
>>>>>>>         Time.days(1));
>>>>>>> ```
>>>>>>>
>>>>>>> It feels a bug to me, but I want to confirm it before I file the bug
>>>>>>> report.
>>>>>>>
>>>>>>> On 2019/04/18 03:38:34, Paul Lam <paullin3...@gmail.com> wrote:
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Could you check the watermark of the window operator? One possible
>>>>>>> situation would be some of the keys are not getting enough inputs, so
>>>>> their
>>>>>>> watermarks remain below the window end time and hold the window
>>>>> operator
>>>>>>> watermark back. IMO, it’s a good practice to assign watermark earlier
>>>>> in
>>>>>>> the data pipeline.
>>>>>>>> Best,
>>>>>>>> Paul Lam
>>>>>>>>
>>>>>>>>> 在 2019年4月17日,23:04,an0...@gmail.com 写道:
>>>>>>>>>
>>>>>>>>> `assignTimestampsAndWatermarks` before `keyBy` works:
>>>>>>>>> ```java
>>>>>>>>> DataStream<Trip> trips =
>>>>>>>>>        env.addSource(consumer).assignTimestampsAndWatermarks(new
>>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
>>>>>>>>>            @Override
>>>>>>>>>            public long extractTimestamp(Trip trip) {
>>>>>>>>>                return trip.endTime.getTime();
>>>>>>>>>            }
>>>>>>>>>        });
>>>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip ->
>>>>> trip.userId);
>>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips =
>>>>> userTrips.process(new
>>>>>>> Featurization());
>>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
>>>>>>>>>        featurizedUserTrips.timeWindowAll(Time.days(7),
>>>>>>>>>                Time.days(1));
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>> But not after `keyBy` and `process`:
>>>>>>>>> ```java
>>>>>>>>> DataStream<Trip> trips = env.addSource(consumer);
>>>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip ->
>>>>> trip.userId);
>>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips =
>>>>>>>>>        userTrips.process(new
>>>>>>> Featurization()).assignTimestampsAndWatermarks(new
>>>>>>> BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) {
>>>>>>>>>            @Override
>>>>>>>>>            public long extractTimestamp(FeaturizedTrip trip) {
>>>>>>>>>                return trip.endTime.getTime();
>>>>>>>>>            }
>>>>>>>>>        });
>>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
>>>>>>>>>        featurizedUserTrips.timeWindowAll(Time.days(7),
>>>>>>>>>                Time.days(1));
>>>>>>>>> ```
>>>>>>>>> Windows are never triggered.
>>>>>>>>>
>>>>>>>>> Is it a bug or expected behavior? If the latter, where is it
>>>>>>> documented?
>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to