EugeneYushin commented on a change in pull request #6646: [FLINK-10050] Support 
allowedLateness in CoGroupedStreams
URL: https://github.com/apache/flink/pull/6646#discussion_r217879647
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ##########
 @@ -239,7 +245,17 @@ protected WithWindow(DataStream<T1> input1,
                @PublicEvolving
                public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super 
TaggedUnion<T1, T2>, ? super W> newEvictor) {
                        return new WithWindow<>(input1, input2, keySelector1, 
keySelector2, keyType,
-                                       windowAssigner, trigger, newEvictor);
+                                       windowAssigner, trigger, newEvictor, 
allowedLateness);
+               }
+
+               /**
+                * Sets the time by which elements are allowed to be late.
+                * @see WindowedStream#allowedLateness(Time)
+                */
+               @PublicEvolving
+               public WithWindow<T1, T2, KEY, W> allowedLateness(Time 
newLateness) {
 
 Review comment:
   Check for null in this place breaks current logic of CoGroup/Join classes. 
CoGroup has no checks for nulls directly in `evictor`/`trigger` methods and 
validates during delegation:
   
https://github.com/apache/flink/blob/98412a5f7227d7694c727847727f9434bcca4e92/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java#L344
   
   I do null check there as well for `allowedLateness` field.
   In the same time, Join.apply delegates to CoGroup.apply:
   
https://github.com/apache/flink/blob/98412a5f7227d7694c727847727f9434bcca4e92/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java#L314).
 
   
   To be consistent, we also should add null checks for evictor and trigger. 
Adding null checks directly in setters/constructor breaks chain of calls in 
`apply` methods (for evictor/trigger/allowedLateness) and requires clumsy 
if-else conditions for each nullable field separately.
   
   Both CoGroup and Join allow null for trigger/evictor (and I've added 
allowedLateness following the same approach) but don't pass validation during 
calls to `apply(...)`.
   
   As a result of null check for `allowedLateness` inside setter, we have 
errors for the following scenario:
   
https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala#L151
   
   It breaks when user doesn't specify any of trigger/evictor/allowedLateness. 
At the same time, these fields are optional and have defaults in 
WindowedStream. Unfortunately, default for allowedLateness in WindowedStream 
has private modificator (and I don't think it's a good practice to set default 
when user passed null by mistake).
   
   Please, let me know you thoughts.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to