[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2019-07-08 Thread Congxian Qiu(klion26) (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880239#comment-16880239
 ] 

Congxian Qiu(klion26) commented on FLINK-10050:
---

Thanks for your reply [~kkl0u], I've created a new issue FLINK-13148 to track 
it.

> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: pull-request-available, ready-to-commit, windows
> Fix For: 1.7.0
>
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2019-07-08 Thread Kostas Kloudas (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880179#comment-16880179
 ] 

Kostas Kloudas commented on FLINK-10050:


Yes, we should open a new issue to track it, and please write in the 
description how you are planning to implement it so that we can chat there. 
Thanks [~klion26]!

> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: pull-request-available, ready-to-commit, windows
> Fix For: 1.7.0
>
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2019-07-08 Thread Congxian Qiu(klion26) (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880113#comment-16880113
 ] 

Congxian Qiu(klion26) commented on FLINK-10050:
---

Thanks for your reply and confirmation [~kkl0u]. yes, you're right. Do I need 
to open a new issue for this or reuse current issue to track it?

As the implementation, I think we should add an input parameter of type 
{{OutputTag}}  in {{CoGroupedStream#WithWindow}}

> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: pull-request-available, ready-to-commit, windows
> Fix For: 1.7.0
>
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2019-07-08 Thread Kostas Kloudas (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880103#comment-16880103
 ] 

Kostas Kloudas commented on FLINK-10050:


Hi [~klion26] ! I agree that the user should be able to get the side output 
containing the late data. So we should essentially align the semantics and the 
functionality of the CoGroupedStreams with the one of the WindowedStream. If I 
am not mistaken, this means exposing from the CoGroupedStreams the 
WindowedStreams.sideOutputLateData(), right?

 

> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: pull-request-available, ready-to-commit, windows
> Fix For: 1.7.0
>
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2019-07-07 Thread Congxian Qiu(klion26) (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16879986#comment-16879986
 ] 

Congxian Qiu(klion26) commented on FLINK-10050:
---

[~aljoscha] [~kkl0u] As we supported {{allowedLateness}} here, maybe we should 
also support attaching {{OutputTag}} to the inner windowStream, what do you 
think?

> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: pull-request-available, ready-to-commit, windows
> Fix For: 1.7.0
>
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-09-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16622047#comment-16622047
 ] 

ASF GitHub Bot commented on FLINK-10050:


asfgit closed pull request #6646: [FLINK-10050] Support allowedLateness in 
CoGroupedStreams
URL: https://github.com/apache/flink/pull/6646
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index 55009e1b4cb..c8b552708c0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -20,6 +20,7 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -40,6 +41,7 @@
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
@@ -183,7 +185,7 @@ public EqualTo equalTo(KeySelector keySelector, 
TypeInformation ke
 */
@PublicEvolving
public  WithWindow 
window(WindowAssigner, W> assigner) {
-   return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
+   return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null, null);
}
}
}
@@ -215,6 +217,12 @@ public EqualTo equalTo(KeySelector keySelector, 
TypeInformation ke
 
private final Evictor, ? super W> 
evictor;
 
+   @VisibleForTesting
+   Time allowedLateness;
+
+   @VisibleForTesting
+   WindowedStream, KEY, W> windowOp;
+
protected WithWindow(DataStream input1,
DataStream input2,
KeySelector keySelector1,
@@ -222,7 +230,8 @@ protected WithWindow(DataStream input1,
TypeInformation keyType,
WindowAssigner, W> 
windowAssigner,
Trigger, ? super W> 
trigger,
-   Evictor, ? super W> 
evictor) {
+   Evictor, ? super W> 
evictor,
+   Time allowedLateness) {
this.input1 = input1;
this.input2 = input2;
 
@@ -233,6 +242,8 @@ protected WithWindow(DataStream input1,
this.windowAssigner = windowAssigner;
this.trigger = trigger;
this.evictor = evictor;
+
+   this.allowedLateness = allowedLateness;
}
 
/**
@@ -241,7 +252,7 @@ protected WithWindow(DataStream input1,
@PublicEvolving
public WithWindow trigger(Trigger, ? super W> newTrigger) {
return new WithWindow<>(input1, input2, keySelector1, 
keySelector2, keyType,
-   windowAssigner, newTrigger, evictor);
+   windowAssigner, newTrigger, evictor, 
allowedLateness);
}
 
/**
@@ -254,7 +265,18 @@ protected WithWindow(DataStream input1,
@PublicEvolving
public WithWindow evictor(Evictor, ? super W> newEvictor) {
return new WithWindow<>(input1, input2, keySelector1, 
keySelector2, keyType,
-   windowAssigner, trigger, newEvictor);
+   windowAssigner, trigger, newEvictor, 
allowedLateness);
+   }
+
+   /**
+* Sets the time by which elements are allowed to be late.
+* @see WindowedStream#allowedLateness(Time)
+ 

[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-09-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16621912#comment-16621912
 ] 

ASF GitHub Bot commented on FLINK-10050:


EugeneYushin commented on issue #6646: [FLINK-10050] Support allowedLateness in 
CoGroupedStreams
URL: https://github.com/apache/flink/pull/6646#issuecomment-423161496
 
 
   @kl0u Great news!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: pull-request-available, ready-to-commit, windows
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


***UNCHECKED*** [jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-09-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620532#comment-16620532
 ] 

ASF GitHub Bot commented on FLINK-10050:


EugeneYushin commented on issue #6646: [FLINK-10050] Support allowedLateness in 
CoGroupedStreams
URL: https://github.com/apache/flink/pull/6646#issuecomment-422789390
 
 
   Nice, CI checks have been passed. I think I have no access to Travis to 
relaunch builds (at least, I don't see any button there to perform such kind of 
operation).
   Thanks for cooperation


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: pull-request-available, ready-to-commit, windows
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-09-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620456#comment-16620456
 ] 

ASF GitHub Bot commented on FLINK-10050:


kl0u commented on issue #6646: [FLINK-10050] Support allowedLateness in 
CoGroupedStreams
URL: https://github.com/apache/flink/pull/6646#issuecomment-422757135
 
 
   I would recommend to relaunch that specific build and see if it happens 
again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: pull-request-available, ready-to-commit, windows
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-09-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620450#comment-16620450
 ] 

ASF GitHub Bot commented on FLINK-10050:


EugeneYushin commented on issue #6646: [FLINK-10050] Support allowedLateness in 
CoGroupedStreams
URL: https://github.com/apache/flink/pull/6646#issuecomment-422754323
 
 
   I see one build from Travis has been finished with error:
   ```
   10:15:13.087 [ERROR] Failed to execute goal on project 
flink-storm-examples_2.11: Could not resolve dependencies for project 
org.apache.flink:flink-storm-examples_2.11:jar:1.7-SNAPSHOT: Could not transfer 
artifact org.apache.storm:storm-starter:jar:1.0.0 from/to central 
(http://repo.maven.apache.org/maven2): GET request of: 
org/apache/storm/storm-starter/1.0.0/storm-starter-1.0.0.jar from central 
failed: Connection reset -> [Help 1]
   ```
   
   Looks like it's not related to proposed changes.
   What is the best way to deal with such kind of failures during CI?
   Is there any way to rerun PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: pull-request-available, ready-to-commit, windows
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-09-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620435#comment-16620435
 ] 

ASF GitHub Bot commented on FLINK-10050:


EugeneYushin commented on issue #6646: [FLINK-10050] Support allowedLateness in 
CoGroupedStreams
URL: https://github.com/apache/flink/pull/6646#issuecomment-422746729
 
 
   @kl0u I didn't want to ping you until Travis is finished.
   Thanks for quick turnaround


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: pull-request-available, ready-to-commit, windows
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-09-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620410#comment-16620410
 ] 

ASF GitHub Bot commented on FLINK-10050:


kl0u commented on issue #6646: [FLINK-10050] Support allowedLateness in 
CoGroupedStreams
URL: https://github.com/apache/flink/pull/6646#issuecomment-422741692
 
 
   Thanks for updating the PR @EugeneYushin . I will have a look today.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: pull-request-available, ready-to-commit, windows
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-09-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618880#comment-16618880
 ] 

ASF GitHub Bot commented on FLINK-10050:


kl0u commented on a change in pull request #6646: [FLINK-10050] Support 
allowedLateness in CoGroupedStreams
URL: https://github.com/apache/flink/pull/6646#discussion_r218378742
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ##
 @@ -239,7 +245,17 @@ protected WithWindow(DataStream input1,
@PublicEvolving
public WithWindow evictor(Evictor, ? super W> newEvictor) {
return new WithWindow<>(input1, input2, keySelector1, 
keySelector2, keyType,
-   windowAssigner, trigger, newEvictor);
+   windowAssigner, trigger, newEvictor, 
allowedLateness);
+   }
+
+   /**
+* Sets the time by which elements are allowed to be late.
+* @see WindowedStream#allowedLateness(Time)
+*/
+   @PublicEvolving
+   public WithWindow allowedLateness(Time 
newLateness) {
 
 Review comment:
   I replied in the Mailing List.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: pull-request-available, ready-to-commit, windows
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-09-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618785#comment-16618785
 ] 

ASF GitHub Bot commented on FLINK-10050:


EugeneYushin commented on a change in pull request #6646: [FLINK-10050] Support 
allowedLateness in CoGroupedStreams
URL: https://github.com/apache/flink/pull/6646#discussion_r218365268
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ##
 @@ -239,7 +245,17 @@ protected WithWindow(DataStream input1,
@PublicEvolving
public WithWindow evictor(Evictor, ? super W> newEvictor) {
return new WithWindow<>(input1, input2, keySelector1, 
keySelector2, keyType,
-   windowAssigner, trigger, newEvictor);
+   windowAssigner, trigger, newEvictor, 
allowedLateness);
+   }
+
+   /**
+* Sets the time by which elements are allowed to be late.
+* @see WindowedStream#allowedLateness(Time)
+*/
+   @PublicEvolving
+   public WithWindow allowedLateness(Time 
newLateness) {
 
 Review comment:
   @kl0u can you please take a look at 
https://lists.apache.org/list.html?d...@flink.apache.org
   I'm in process of writing unit tests, and I can't get stable results as for 
now.
   
   Do you have any thoughts on this topic?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: pull-request-available, ready-to-commit, windows
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-09-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16616355#comment-16616355
 ] 

ASF GitHub Bot commented on FLINK-10050:


kl0u commented on a change in pull request #6646: [FLINK-10050] Support 
allowedLateness in CoGroupedStreams
URL: https://github.com/apache/flink/pull/6646#discussion_r217887352
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ##
 @@ -239,7 +245,17 @@ protected WithWindow(DataStream input1,
@PublicEvolving
public WithWindow evictor(Evictor, ? super W> newEvictor) {
return new WithWindow<>(input1, input2, keySelector1, 
keySelector2, keyType,
-   windowAssigner, trigger, newEvictor);
+   windowAssigner, trigger, newEvictor, 
allowedLateness);
+   }
+
+   /**
+* Sets the time by which elements are allowed to be late.
+* @see WindowedStream#allowedLateness(Time)
+*/
+   @PublicEvolving
+   public WithWindow allowedLateness(Time 
newLateness) {
 
 Review comment:
   The `null` check should be added to check if the user-specified argument is 
null (the `newLateness`). Not if the class field `allowedLateness` is null. 
`allowedLateness` can be null, as this is the default value. This check does 
not break anything and they should be there to tell that if you call the 
method, you cannot pass a `null` as an argument.
   
   Nobody said to put checks for the `evictor` and the `trigger`.
   
   In addition, you should add some tests in the PR. This is not only to test 
if everything works, but also to guarantee that nobody in the future will break 
this functionality.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: pull-request-available, ready-to-commit, windows
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-09-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16616225#comment-16616225
 ] 

ASF GitHub Bot commented on FLINK-10050:


EugeneYushin commented on a change in pull request #6646: [FLINK-10050] Support 
allowedLateness in CoGroupedStreams
URL: https://github.com/apache/flink/pull/6646#discussion_r217879647
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ##
 @@ -239,7 +245,17 @@ protected WithWindow(DataStream input1,
@PublicEvolving
public WithWindow evictor(Evictor, ? super W> newEvictor) {
return new WithWindow<>(input1, input2, keySelector1, 
keySelector2, keyType,
-   windowAssigner, trigger, newEvictor);
+   windowAssigner, trigger, newEvictor, 
allowedLateness);
+   }
+
+   /**
+* Sets the time by which elements are allowed to be late.
+* @see WindowedStream#allowedLateness(Time)
+*/
+   @PublicEvolving
+   public WithWindow allowedLateness(Time 
newLateness) {
 
 Review comment:
   Check for null in this place breaks current logic of CoGroup/Join classes. 
CoGroup has no checks for nulls directly in `evictor`/`trigger` methods and 
validates during delegation:
   
https://github.com/apache/flink/blob/98412a5f7227d7694c727847727f9434bcca4e92/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java#L344
   
   I do null check there as well for `allowedLateness` field.
   In the same time, Join.apply delegates to CoGroup.apply:
   
https://github.com/apache/flink/blob/98412a5f7227d7694c727847727f9434bcca4e92/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java#L314).
 
   
   To be consistent, we also should add null checks for evictor and trigger. 
Adding null checks directly in setters/constructor breaks chain of calls in 
`apply` methods (for evictor/trigger/allowedLateness) and requires clumsy 
if-else conditions for each nullable field separately.
   
   Both CoGroup and Join allow null for trigger/evictor (and I've added 
allowedLateness following the same approach) but don't pass validation during 
calls to `apply(...)`.
   
   As a result of null check for `allowedLateness` inside setter, we have 
errors for the following scenario:
   
https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala#L151
   
   It breaks when user doesn't specify any of trigger/evictor/allowedLateness. 
At the same time, these fields are optional and have defaults in 
WindowedStream. Unfortunately, default for allowedLateness in WindowedStream 
has private modificator (and I don't think it's a good practice to set default 
when user passed null by mistake).
   
   Please, let me know you thoughts.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: pull-request-available, ready-to-commit, windows
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-09-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16614876#comment-16614876
 ] 

ASF GitHub Bot commented on FLINK-10050:


kl0u commented on a change in pull request #6646: [FLINK-10050] Support 
allowedLateness in CoGroupedStreams
URL: https://github.com/apache/flink/pull/6646#discussion_r217717346
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ##
 @@ -239,7 +245,17 @@ protected WithWindow(DataStream input1,
@PublicEvolving
public WithWindow evictor(Evictor, ? super W> newEvictor) {
return new WithWindow<>(input1, input2, keySelector1, 
keySelector2, keyType,
-   windowAssigner, trigger, newEvictor);
+   windowAssigner, trigger, newEvictor, 
allowedLateness);
+   }
+
+   /**
+* Sets the time by which elements are allowed to be late.
+* @see WindowedStream#allowedLateness(Time)
+*/
+   @PublicEvolving
+   public WithWindow allowedLateness(Time 
newLateness) {
 
 Review comment:
   Check for `null` argument.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: pull-request-available, ready-to-commit, windows
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-09-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16614875#comment-16614875
 ] 

ASF GitHub Bot commented on FLINK-10050:


kl0u commented on a change in pull request #6646: [FLINK-10050] Support 
allowedLateness in CoGroupedStreams
URL: https://github.com/apache/flink/pull/6646#discussion_r217716951
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
 ##
 @@ -254,7 +260,17 @@ protected WithWindow(DataStream input1,
@PublicEvolving
public WithWindow evictor(Evictor, ? super W> newEvictor) {
return new WithWindow<>(input1, input2, keySelector1, 
keySelector2, keyType,
-   windowAssigner, trigger, newEvictor);
+   windowAssigner, trigger, newEvictor, 
allowedLateness);
+   }
+
+   /**
+* Sets the time by which elements are allowed to be late.
+* @see WindowedStream#allowedLateness(Time)
+*/
+   @PublicEvolving
+   public WithWindow allowedLateness(Time 
newLateness) {
+   return new WithWindow<>(input1, input2, keySelector1, 
keySelector2, keyType,
 
 Review comment:
   Check for `null`. If the user calls `allowedLateness`, he should not pass a 
`null` value.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: pull-request-available, ready-to-commit, windows
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-09-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16614842#comment-16614842
 ] 

ASF GitHub Bot commented on FLINK-10050:


kl0u commented on issue #6646: [FLINK-10050] Support allowedLateness in 
CoGroupedStreams
URL: https://github.com/apache/flink/pull/6646#issuecomment-421359975
 
 
   Hi @EugeneYushin . This PR has no tests. Please add them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: pull-request-available, ready-to-commit, windows
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-09-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16602400#comment-16602400
 ] 

ASF GitHub Bot commented on FLINK-10050:


EugeneYushin commented on issue #6646: [FLINK-10050] Support allowedLateness in 
CoGroupedStreams
URL: https://github.com/apache/flink/pull/6646#issuecomment-418175648
 
 
   @aljoscha @tillrohrmann 
   guys, please can you take a glance on it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: pull-request-available, ready-to-commit, windows
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-09-02 Thread eugen yushin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16601172#comment-16601172
 ] 

eugen yushin commented on FLINK-10050:
--

[~aljoscha], [~till.rohrmann]
Guys, please can you take a look at PR?
I didn't add unit tests because of:
a. there're no mock tests for referenced files in master branch to cover such 
kind of delegates as evictor/trigger/...
b. 'allowedLateness' is an feature of WindowedStream, and proposed fix simply 
delegates all the work to WindowedStream logic

Regards

> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: pull-request-available, ready-to-commit, windows
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-09-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16601166#comment-16601166
 ] 

ASF GitHub Bot commented on FLINK-10050:


EugeneYushin opened a new pull request #6646: [FLINK-10050] Support 
allowedLateness in CoGroupedStreams
URL: https://github.com/apache/flink/pull/6646
 
 
   ## What is the purpose of the change
   
[https://issues.apache.org/jira/browse/FLINK-10050](https://issues.apache.org/jira/browse/FLINK-10050)
   Add 'allowedLateness' method to coGroup and join streams API.
   
   
   ## Brief change log
   
   - add 'allowedLateness' for CoGroupedStreams/JoinedStreams java and scala API
   - delegate calls to underlying WindowedStream (as for Trigger/Evictor 
scenario)
   
   
   ## Verifying this change
   
   This change is a trivial rework.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: pull-request-available, ready-to-commit, windows
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-08-30 Thread eugen yushin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16597537#comment-16597537
 ] 

eugen yushin commented on FLINK-10050:
--

thx, I'm proceeding with PR then
will keep you posted

> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: ready-to-commit, windows
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-08-30 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16597246#comment-16597246
 ] 

Aljoscha Krettek commented on FLINK-10050:
--

I see. I think then this addition would be ok. 👌

> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: ready-to-commit, windows
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-08-24 Thread eugen yushin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591511#comment-16591511
 ] 

eugen yushin commented on FLINK-10050:
--

[~aljoscha] There's no info about windows for any of operator in Flink. Docs:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#working-with-window-results
```
The result of a windowed operation is again a {{DataStream}}, no information 
about the windowed operations is retained in the result elements
```

At the same time, coGroup/join keeps element's timestamps and consecutive 
operators can assign elements to respective windows. Docs:
[https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/joining.html#window-join]
```
Those elements that do get joined will have as their timestamp the largest 
timestamp that still lies in the respective window. For example a window with 
{{[5, 10)}} as its boundaries would result in the joined elements having 9 as 
their timestamp.
```

Business case: 2 streams, 1 for different business metrics, another one - 
similar metrics but from microservices logs, result - reconciliation of these 2 
streams. No other operators except sink are need for this particular business 
case.

> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: ready-to-commit, windows
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-08-23 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590245#comment-16590245
 ] 

Aljoscha Krettek commented on FLINK-10050:
--

I think {{DataStream.join()}} and {{DataStream.coGroup()}} are a bit of a dead 
end because they don't allow getting any information about what window the 
result is in, or other meta information about the window that you would get 
from a {{ProcessWindowFunction}}.

I'm interested if you have a use case for this, where you don't need to know 
what window your result is in. 

> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: ready-to-commit, windows
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-08-03 Thread eugen yushin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16568318#comment-16568318
 ] 

eugen yushin commented on FLINK-10050:
--

I've a bit experimented with `allowedLateness` and cogroups and have an 
implementation. Let's discuss if anyone has concerns on this so I can proceed 
with PR.

> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1
>Reporter: eugen yushin
>Priority: Major
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)