EugeneYushin commented on a change in pull request #6646: [FLINK-10050] Support allowedLateness in CoGroupedStreams URL: https://github.com/apache/flink/pull/6646#discussion_r217879647
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java ########## @@ -239,7 +245,17 @@ protected WithWindow(DataStream<T1> input1, @PublicEvolving public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, - windowAssigner, trigger, newEvictor); + windowAssigner, trigger, newEvictor, allowedLateness); + } + + /** + * Sets the time by which elements are allowed to be late. + * @see WindowedStream#allowedLateness(Time) + */ + @PublicEvolving + public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) { Review comment: Check for null in this place breaks current logic of CoGroup/Join classes. CoGroup has no checks for nulls directly in `evictor`/`trigger` methods and validates during delegation: https://github.com/apache/flink/blob/98412a5f7227d7694c727847727f9434bcca4e92/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java#L344 I do null check there as well for `allowedLateness` field. In the same time, Join.apply delegates to CoGroup.apply: https://github.com/apache/flink/blob/98412a5f7227d7694c727847727f9434bcca4e92/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java#L314). To be consistent, we also should add null checks for evictor and trigger. Adding null checks directly in setters/constructor breaks chain of calls in `apply` methods (for evictor/trigger/allowedLateness) and requires clumsy if-else conditions for each nullable field separately. Both CoGroup and Join allow null for trigger/evictor (and I've added allowedLateness following the same approach) but don't pass validation during calls to `apply(...)`. As a result of null check for `allowedLateness` inside setter, we have errors for the following scenario: https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala#L151 It breaks when user doesn't specify any of trigger/evictor/allowedLateness. At the same time, these fields are optional and have defaults in WindowedStream. Unfortunately, default for allowedLateness in WindowedStream has private modificator (and I don't think it's a good practice to set default when user passed null by mistake). Please, let me know you thoughts. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services