Hi,

Could you check the watermark of the window operator? One possible situation 
would be some of the keys are not getting enough inputs, so their watermarks 
remain below the window end time and hold the window operator watermark back. 
IMO, it’s a good practice to assign watermark earlier in the data pipeline.

Best,
Paul Lam

> 在 2019年4月17日,23:04,an0...@gmail.com 写道:
> 
> `assignTimestampsAndWatermarks` before `keyBy` works:
> ```java
> DataStream<Trip> trips =
>        env.addSource(consumer).assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
>            @Override
>            public long extractTimestamp(Trip trip) {
>                return trip.endTime.getTime();
>            }
>        });
> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> DataStream<FeaturizedTrip> featurizedUserTrips = userTrips.process(new 
> Featurization());
> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
>        featurizedUserTrips.timeWindowAll(Time.days(7),
>                Time.days(1));
> ```
> 
> But not after `keyBy` and `process`:
> ```java
> DataStream<Trip> trips = env.addSource(consumer);
> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> DataStream<FeaturizedTrip> featurizedUserTrips =
>        userTrips.process(new 
> Featurization()).assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) {
>            @Override
>            public long extractTimestamp(FeaturizedTrip trip) {
>                return trip.endTime.getTime();
>            }
>        });
> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
>        featurizedUserTrips.timeWindowAll(Time.days(7),
>                Time.days(1));
> ```
> Windows are never triggered.
> 
> Is it a bug or expected behavior? If the latter, where is it documented?
> 

Reply via email to