[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880239#comment-16880239 ] Congxian Qiu(klion26) commented on FLINK-10050: --- Thanks for your reply [~kkl0u], I've created a new issue FLINK-13148 to track it. > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: pull-request-available, ready-to-commit, windows > Fix For: 1.7.0 > > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880179#comment-16880179 ] Kostas Kloudas commented on FLINK-10050: Yes, we should open a new issue to track it, and please write in the description how you are planning to implement it so that we can chat there. Thanks [~klion26]! > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: pull-request-available, ready-to-commit, windows > Fix For: 1.7.0 > > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880113#comment-16880113 ] Congxian Qiu(klion26) commented on FLINK-10050: --- Thanks for your reply and confirmation [~kkl0u]. yes, you're right. Do I need to open a new issue for this or reuse current issue to track it? As the implementation, I think we should add an input parameter of type {{OutputTag}} in {{CoGroupedStream#WithWindow}} > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: pull-request-available, ready-to-commit, windows > Fix For: 1.7.0 > > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880103#comment-16880103 ] Kostas Kloudas commented on FLINK-10050: Hi [~klion26] ! I agree that the user should be able to get the side output containing the late data. So we should essentially align the semantics and the functionality of the CoGroupedStreams with the one of the WindowedStream. If I am not mistaken, this means exposing from the CoGroupedStreams the WindowedStreams.sideOutputLateData(), right? > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: pull-request-available, ready-to-commit, windows > Fix For: 1.7.0 > > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16879986#comment-16879986 ] Congxian Qiu(klion26) commented on FLINK-10050: --- [~aljoscha] [~kkl0u] As we supported {{allowedLateness}} here, maybe we should also support attaching {{OutputTag}} to the inner windowStream, what do you think? > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: pull-request-available, ready-to-commit, windows > Fix For: 1.7.0 > > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16622047#comment-16622047 ] ASF GitHub Bot commented on FLINK-10050: asfgit closed pull request #6646: [FLINK-10050] Support allowedLateness in CoGroupedStreams URL: https://github.com/apache/flink/pull/6646 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java index 55009e1b4cb..c8b552708c0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.MapFunction; @@ -40,6 +41,7 @@ import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.evictors.Evictor; +import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; @@ -183,7 +185,7 @@ public EqualTo equalTo(KeySelector keySelector, TypeInformation ke */ @PublicEvolving public WithWindow window(WindowAssigner, W> assigner) { - return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null); + return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null); } } } @@ -215,6 +217,12 @@ public EqualTo equalTo(KeySelector keySelector, TypeInformation ke private final Evictor, ? super W> evictor; + @VisibleForTesting + Time allowedLateness; + + @VisibleForTesting + WindowedStream, KEY, W> windowOp; + protected WithWindow(DataStream input1, DataStream input2, KeySelector keySelector1, @@ -222,7 +230,8 @@ protected WithWindow(DataStream input1, TypeInformation keyType, WindowAssigner, W> windowAssigner, Trigger, ? super W> trigger, - Evictor, ? super W> evictor) { + Evictor, ? super W> evictor, + Time allowedLateness) { this.input1 = input1; this.input2 = input2; @@ -233,6 +242,8 @@ protected WithWindow(DataStream input1, this.windowAssigner = windowAssigner; this.trigger = trigger; this.evictor = evictor; + + this.allowedLateness = allowedLateness; } /** @@ -241,7 +252,7 @@ protected WithWindow(DataStream input1, @PublicEvolving public WithWindow trigger(Trigger, ? super W> newTrigger) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, - windowAssigner, newTrigger, evictor); + windowAssigner, newTrigger, evictor, allowedLateness); } /** @@ -254,7 +265,18 @@ protected WithWindow(DataStream input1, @PublicEvolving public WithWindow evictor(Evictor, ? super W> newEvictor) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, - windowAssigner, trigger, newEvictor); + windowAssigner, trigger, newEvictor, allowedLateness); + } + + /** +* Sets the time by which elements are allowed to be late. +* @see WindowedStream#allowedLateness(Time) +
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16621912#comment-16621912 ] ASF GitHub Bot commented on FLINK-10050: EugeneYushin commented on issue #6646: [FLINK-10050] Support allowedLateness in CoGroupedStreams URL: https://github.com/apache/flink/pull/6646#issuecomment-423161496 @kl0u Great news! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: pull-request-available, ready-to-commit, windows > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
***UNCHECKED*** [jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620532#comment-16620532 ] ASF GitHub Bot commented on FLINK-10050: EugeneYushin commented on issue #6646: [FLINK-10050] Support allowedLateness in CoGroupedStreams URL: https://github.com/apache/flink/pull/6646#issuecomment-422789390 Nice, CI checks have been passed. I think I have no access to Travis to relaunch builds (at least, I don't see any button there to perform such kind of operation). Thanks for cooperation This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: pull-request-available, ready-to-commit, windows > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620456#comment-16620456 ] ASF GitHub Bot commented on FLINK-10050: kl0u commented on issue #6646: [FLINK-10050] Support allowedLateness in CoGroupedStreams URL: https://github.com/apache/flink/pull/6646#issuecomment-422757135 I would recommend to relaunch that specific build and see if it happens again. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: pull-request-available, ready-to-commit, windows > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620450#comment-16620450 ] ASF GitHub Bot commented on FLINK-10050: EugeneYushin commented on issue #6646: [FLINK-10050] Support allowedLateness in CoGroupedStreams URL: https://github.com/apache/flink/pull/6646#issuecomment-422754323 I see one build from Travis has been finished with error: ``` 10:15:13.087 [ERROR] Failed to execute goal on project flink-storm-examples_2.11: Could not resolve dependencies for project org.apache.flink:flink-storm-examples_2.11:jar:1.7-SNAPSHOT: Could not transfer artifact org.apache.storm:storm-starter:jar:1.0.0 from/to central (http://repo.maven.apache.org/maven2): GET request of: org/apache/storm/storm-starter/1.0.0/storm-starter-1.0.0.jar from central failed: Connection reset -> [Help 1] ``` Looks like it's not related to proposed changes. What is the best way to deal with such kind of failures during CI? Is there any way to rerun PR? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: pull-request-available, ready-to-commit, windows > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620435#comment-16620435 ] ASF GitHub Bot commented on FLINK-10050: EugeneYushin commented on issue #6646: [FLINK-10050] Support allowedLateness in CoGroupedStreams URL: https://github.com/apache/flink/pull/6646#issuecomment-422746729 @kl0u I didn't want to ping you until Travis is finished. Thanks for quick turnaround This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: pull-request-available, ready-to-commit, windows > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620410#comment-16620410 ] ASF GitHub Bot commented on FLINK-10050: kl0u commented on issue #6646: [FLINK-10050] Support allowedLateness in CoGroupedStreams URL: https://github.com/apache/flink/pull/6646#issuecomment-422741692 Thanks for updating the PR @EugeneYushin . I will have a look today. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: pull-request-available, ready-to-commit, windows > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618880#comment-16618880 ] ASF GitHub Bot commented on FLINK-10050: kl0u commented on a change in pull request #6646: [FLINK-10050] Support allowedLateness in CoGroupedStreams URL: https://github.com/apache/flink/pull/6646#discussion_r218378742 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java ## @@ -239,7 +245,17 @@ protected WithWindow(DataStream input1, @PublicEvolving public WithWindow evictor(Evictor, ? super W> newEvictor) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, - windowAssigner, trigger, newEvictor); + windowAssigner, trigger, newEvictor, allowedLateness); + } + + /** +* Sets the time by which elements are allowed to be late. +* @see WindowedStream#allowedLateness(Time) +*/ + @PublicEvolving + public WithWindow allowedLateness(Time newLateness) { Review comment: I replied in the Mailing List. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: pull-request-available, ready-to-commit, windows > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618785#comment-16618785 ] ASF GitHub Bot commented on FLINK-10050: EugeneYushin commented on a change in pull request #6646: [FLINK-10050] Support allowedLateness in CoGroupedStreams URL: https://github.com/apache/flink/pull/6646#discussion_r218365268 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java ## @@ -239,7 +245,17 @@ protected WithWindow(DataStream input1, @PublicEvolving public WithWindow evictor(Evictor, ? super W> newEvictor) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, - windowAssigner, trigger, newEvictor); + windowAssigner, trigger, newEvictor, allowedLateness); + } + + /** +* Sets the time by which elements are allowed to be late. +* @see WindowedStream#allowedLateness(Time) +*/ + @PublicEvolving + public WithWindow allowedLateness(Time newLateness) { Review comment: @kl0u can you please take a look at https://lists.apache.org/list.html?d...@flink.apache.org I'm in process of writing unit tests, and I can't get stable results as for now. Do you have any thoughts on this topic? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: pull-request-available, ready-to-commit, windows > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16616355#comment-16616355 ] ASF GitHub Bot commented on FLINK-10050: kl0u commented on a change in pull request #6646: [FLINK-10050] Support allowedLateness in CoGroupedStreams URL: https://github.com/apache/flink/pull/6646#discussion_r217887352 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java ## @@ -239,7 +245,17 @@ protected WithWindow(DataStream input1, @PublicEvolving public WithWindow evictor(Evictor, ? super W> newEvictor) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, - windowAssigner, trigger, newEvictor); + windowAssigner, trigger, newEvictor, allowedLateness); + } + + /** +* Sets the time by which elements are allowed to be late. +* @see WindowedStream#allowedLateness(Time) +*/ + @PublicEvolving + public WithWindow allowedLateness(Time newLateness) { Review comment: The `null` check should be added to check if the user-specified argument is null (the `newLateness`). Not if the class field `allowedLateness` is null. `allowedLateness` can be null, as this is the default value. This check does not break anything and they should be there to tell that if you call the method, you cannot pass a `null` as an argument. Nobody said to put checks for the `evictor` and the `trigger`. In addition, you should add some tests in the PR. This is not only to test if everything works, but also to guarantee that nobody in the future will break this functionality. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: pull-request-available, ready-to-commit, windows > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16616225#comment-16616225 ] ASF GitHub Bot commented on FLINK-10050: EugeneYushin commented on a change in pull request #6646: [FLINK-10050] Support allowedLateness in CoGroupedStreams URL: https://github.com/apache/flink/pull/6646#discussion_r217879647 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java ## @@ -239,7 +245,17 @@ protected WithWindow(DataStream input1, @PublicEvolving public WithWindow evictor(Evictor, ? super W> newEvictor) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, - windowAssigner, trigger, newEvictor); + windowAssigner, trigger, newEvictor, allowedLateness); + } + + /** +* Sets the time by which elements are allowed to be late. +* @see WindowedStream#allowedLateness(Time) +*/ + @PublicEvolving + public WithWindow allowedLateness(Time newLateness) { Review comment: Check for null in this place breaks current logic of CoGroup/Join classes. CoGroup has no checks for nulls directly in `evictor`/`trigger` methods and validates during delegation: https://github.com/apache/flink/blob/98412a5f7227d7694c727847727f9434bcca4e92/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java#L344 I do null check there as well for `allowedLateness` field. In the same time, Join.apply delegates to CoGroup.apply: https://github.com/apache/flink/blob/98412a5f7227d7694c727847727f9434bcca4e92/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java#L314). To be consistent, we also should add null checks for evictor and trigger. Adding null checks directly in setters/constructor breaks chain of calls in `apply` methods (for evictor/trigger/allowedLateness) and requires clumsy if-else conditions for each nullable field separately. Both CoGroup and Join allow null for trigger/evictor (and I've added allowedLateness following the same approach) but don't pass validation during calls to `apply(...)`. As a result of null check for `allowedLateness` inside setter, we have errors for the following scenario: https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala#L151 It breaks when user doesn't specify any of trigger/evictor/allowedLateness. At the same time, these fields are optional and have defaults in WindowedStream. Unfortunately, default for allowedLateness in WindowedStream has private modificator (and I don't think it's a good practice to set default when user passed null by mistake). Please, let me know you thoughts. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: pull-request-available, ready-to-commit, windows > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16614876#comment-16614876 ] ASF GitHub Bot commented on FLINK-10050: kl0u commented on a change in pull request #6646: [FLINK-10050] Support allowedLateness in CoGroupedStreams URL: https://github.com/apache/flink/pull/6646#discussion_r217717346 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java ## @@ -239,7 +245,17 @@ protected WithWindow(DataStream input1, @PublicEvolving public WithWindow evictor(Evictor, ? super W> newEvictor) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, - windowAssigner, trigger, newEvictor); + windowAssigner, trigger, newEvictor, allowedLateness); + } + + /** +* Sets the time by which elements are allowed to be late. +* @see WindowedStream#allowedLateness(Time) +*/ + @PublicEvolving + public WithWindow allowedLateness(Time newLateness) { Review comment: Check for `null` argument. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: pull-request-available, ready-to-commit, windows > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16614875#comment-16614875 ] ASF GitHub Bot commented on FLINK-10050: kl0u commented on a change in pull request #6646: [FLINK-10050] Support allowedLateness in CoGroupedStreams URL: https://github.com/apache/flink/pull/6646#discussion_r217716951 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java ## @@ -254,7 +260,17 @@ protected WithWindow(DataStream input1, @PublicEvolving public WithWindow evictor(Evictor, ? super W> newEvictor) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, - windowAssigner, trigger, newEvictor); + windowAssigner, trigger, newEvictor, allowedLateness); + } + + /** +* Sets the time by which elements are allowed to be late. +* @see WindowedStream#allowedLateness(Time) +*/ + @PublicEvolving + public WithWindow allowedLateness(Time newLateness) { + return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, Review comment: Check for `null`. If the user calls `allowedLateness`, he should not pass a `null` value. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: pull-request-available, ready-to-commit, windows > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16614842#comment-16614842 ] ASF GitHub Bot commented on FLINK-10050: kl0u commented on issue #6646: [FLINK-10050] Support allowedLateness in CoGroupedStreams URL: https://github.com/apache/flink/pull/6646#issuecomment-421359975 Hi @EugeneYushin . This PR has no tests. Please add them. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: pull-request-available, ready-to-commit, windows > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16602400#comment-16602400 ] ASF GitHub Bot commented on FLINK-10050: EugeneYushin commented on issue #6646: [FLINK-10050] Support allowedLateness in CoGroupedStreams URL: https://github.com/apache/flink/pull/6646#issuecomment-418175648 @aljoscha @tillrohrmann guys, please can you take a glance on it? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: pull-request-available, ready-to-commit, windows > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16601172#comment-16601172 ] eugen yushin commented on FLINK-10050: -- [~aljoscha], [~till.rohrmann] Guys, please can you take a look at PR? I didn't add unit tests because of: a. there're no mock tests for referenced files in master branch to cover such kind of delegates as evictor/trigger/... b. 'allowedLateness' is an feature of WindowedStream, and proposed fix simply delegates all the work to WindowedStream logic Regards > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: pull-request-available, ready-to-commit, windows > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16601166#comment-16601166 ] ASF GitHub Bot commented on FLINK-10050: EugeneYushin opened a new pull request #6646: [FLINK-10050] Support allowedLateness in CoGroupedStreams URL: https://github.com/apache/flink/pull/6646 ## What is the purpose of the change [https://issues.apache.org/jira/browse/FLINK-10050](https://issues.apache.org/jira/browse/FLINK-10050) Add 'allowedLateness' method to coGroup and join streams API. ## Brief change log - add 'allowedLateness' for CoGroupedStreams/JoinedStreams java and scala API - delegate calls to underlying WindowedStream (as for Trigger/Evictor scenario) ## Verifying this change This change is a trivial rework. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: pull-request-available, ready-to-commit, windows > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16597537#comment-16597537 ] eugen yushin commented on FLINK-10050: -- thx, I'm proceeding with PR then will keep you posted > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: ready-to-commit, windows > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16597246#comment-16597246 ] Aljoscha Krettek commented on FLINK-10050: -- I see. I think then this addition would be ok. 👌 > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: ready-to-commit, windows > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591511#comment-16591511 ] eugen yushin commented on FLINK-10050: -- [~aljoscha] There's no info about windows for any of operator in Flink. Docs: https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#working-with-window-results ``` The result of a windowed operation is again a {{DataStream}}, no information about the windowed operations is retained in the result elements ``` At the same time, coGroup/join keeps element's timestamps and consecutive operators can assign elements to respective windows. Docs: [https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/joining.html#window-join] ``` Those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with {{[5, 10)}} as its boundaries would result in the joined elements having 9 as their timestamp. ``` Business case: 2 streams, 1 for different business metrics, another one - similar metrics but from microservices logs, result - reconciliation of these 2 streams. No other operators except sink are need for this particular business case. > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: ready-to-commit, windows > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590245#comment-16590245 ] Aljoscha Krettek commented on FLINK-10050: -- I think {{DataStream.join()}} and {{DataStream.coGroup()}} are a bit of a dead end because they don't allow getting any information about what window the result is in, or other meta information about the window that you would get from a {{ProcessWindowFunction}}. I'm interested if you have a use case for this, where you don't need to know what window your result is in. > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: ready-to-commit, windows > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16568318#comment-16568318 ] eugen yushin commented on FLINK-10050: -- I've a bit experimented with `allowedLateness` and cogroups and have an implementation. Let's discuss if anyone has concerns on this so I can proceed with PR. > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1 >Reporter: eugen yushin >Priority: Major > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)