Hi, Could you check the watermark of the window operator? One possible situation would be some of the keys are not getting enough inputs, so their watermarks remain below the window end time and hold the window operator watermark back. IMO, it’s a good practice to assign watermark earlier in the data pipeline.
Best, Paul Lam > 在 2019年4月17日,23:04,an0...@gmail.com 写道: > > `assignTimestampsAndWatermarks` before `keyBy` works: > ```java > DataStream<Trip> trips = > env.addSource(consumer).assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > @Override > public long extractTimestamp(Trip trip) { > return trip.endTime.getTime(); > } > }); > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > DataStream<FeaturizedTrip> featurizedUserTrips = userTrips.process(new > Featurization()); > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = > featurizedUserTrips.timeWindowAll(Time.days(7), > Time.days(1)); > ``` > > But not after `keyBy` and `process`: > ```java > DataStream<Trip> trips = env.addSource(consumer); > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > DataStream<FeaturizedTrip> featurizedUserTrips = > userTrips.process(new > Featurization()).assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) { > @Override > public long extractTimestamp(FeaturizedTrip trip) { > return trip.endTime.getTime(); > } > }); > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = > featurizedUserTrips.timeWindowAll(Time.days(7), > Time.days(1)); > ``` > Windows are never triggered. > > Is it a bug or expected behavior? If the latter, where is it documented? >