[jira] [Commented] (FLINK-5157) Extending AllWindow Function Metadata
[ https://issues.apache.org/jira/browse/FLINK-5157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15888012#comment-15888012 ] ASF GitHub Bot commented on FLINK-5157: --- Github user VenturaDelMonte closed the pull request at: https://github.com/apache/flink/pull/2946 > Extending AllWindow Function Metadata > - > > Key: FLINK-5157 > URL: https://issues.apache.org/jira/browse/FLINK-5157 > Project: Flink > Issue Type: New Feature > Components: DataStream API, Streaming >Reporter: Ventura Del Monte >Assignee: Ventura Del Monte > Fix For: 1.3.0 > > > Following the logic behind [1,2], ProcessAllWindowFunction can be introduced > in Flink and AllWindowedStream can be extended in order to support them. > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata > [2] https://issues.apache.org/jira/browse/FLINK-4997 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5157) Extending AllWindow Function Metadata
[ https://issues.apache.org/jira/browse/FLINK-5157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887977#comment-15887977 ] ASF GitHub Bot commented on FLINK-5157: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2946 Thanks a lot for working on this! I just merged Could you please close this PR? > Extending AllWindow Function Metadata > - > > Key: FLINK-5157 > URL: https://issues.apache.org/jira/browse/FLINK-5157 > Project: Flink > Issue Type: New Feature > Components: DataStream API, Streaming >Reporter: Ventura Del Monte >Assignee: Ventura Del Monte > Fix For: 1.3.0 > > > Following the logic behind [1,2], ProcessAllWindowFunction can be introduced > in Flink and AllWindowedStream can be extended in order to support them. > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata > [2] https://issues.apache.org/jira/browse/FLINK-4997 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5157) Extending AllWindow Function Metadata
[ https://issues.apache.org/jira/browse/FLINK-5157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887802#comment-15887802 ] ASF GitHub Bot commented on FLINK-5157: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2946#discussion_r103425262 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java --- @@ -642,31 +980,71 @@ public AllWindowedStream(DataStream input, * @return The data stream that is the result of applying the window function to the window. */ public SingleOutputStreamOperator apply(AllWindowFunctionfunction) { + String callLocation = Utils.getCallLocationName(); + function = input.getExecutionEnvironment().clean(function); TypeInformation resultType = TypeExtractor.getUnaryOperatorReturnType( function, AllWindowFunction.class, true, true, getInputType(), null, false); - - return apply(function, resultType); + return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation); } /** * Applies the given window function to each window. The window function is called for each -* evaluation of the window for each key individually. The output of the window function is +* evaluation of the window. The output of the window function is * interpreted as a regular non-windowed stream. * * * Not that this function requires that all data in the windows is buffered until the window * is evaluated, as the function provides no means of incremental aggregation. * * @param function The window function. -* @param resultType Type information for the result type of the window function * @return The data stream that is the result of applying the window function to the window. */ public SingleOutputStreamOperator apply(AllWindowFunction function, TypeInformation resultType) { + String callLocation = Utils.getCallLocationName(); + function = input.getExecutionEnvironment().clean(function); + return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation); + } - //clean the closure + /** +* Applies the given window function to each window. The window function is called for each +* evaluation of the window. The output of the window function is +* interpreted as a regular non-windowed stream. +* +* +* Not that this function requires that all data in the windows is buffered until the window +* is evaluated, as the function provides no means of incremental aggregation. +* +* @param function The process window function. +* @return The data stream that is the result of applying the window function to the window. +*/ + public SingleOutputStreamOperator process(ProcessAllWindowFunction function) { --- End diff -- This and the following should be `@PublicEvolving` > Extending AllWindow Function Metadata > - > > Key: FLINK-5157 > URL: https://issues.apache.org/jira/browse/FLINK-5157 > Project: Flink > Issue Type: New Feature > Components: DataStream API, Streaming >Reporter: Ventura Del Monte >Assignee: Ventura Del Monte > > Following the logic behind [1,2], ProcessAllWindowFunction can be introduced > in Flink and AllWindowedStream can be extended in order to support them. > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata > [2] https://issues.apache.org/jira/browse/FLINK-4997 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5157) Extending AllWindow Function Metadata
[ https://issues.apache.org/jira/browse/FLINK-5157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887803#comment-15887803 ] ASF GitHub Bot commented on FLINK-5157: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2946#discussion_r103425179 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java --- @@ -478,6 +581,135 @@ public AllWindowedStream(DataStream input, return input.transform(opName, resultType, operator).forceNonParallel(); } + /** +* Applies the given window function to each window. The window function is called for each +* evaluation of the window for each key individually. The output of the window function is +* interpreted as a regular non-windowed stream. +* +* Arriving data is incrementally aggregated using the given aggregate function. This means +* that the window function typically has only a single value to process when called. +* +* @param aggFunction The aggregate function that is used for incremental aggregation. +* @param windowFunction The process window function. +* +* @return The data stream that is the result of applying the window function to the window. +* +* @param The type of the AggregateFunction's accumulator +* @param The type of AggregateFunction's result, and the WindowFunction's input +* @param The type of the elements in the resulting stream, equal to the +*WindowFunction's result type +*/ + publicSingleOutputStreamOperator aggregate( --- End diff -- These should also be `@PublicEvolving`. (I know that the corresponding methods on `WindowedStream` weren't marked like this due to an oversight on the implementers side.) No problem. > Extending AllWindow Function Metadata > - > > Key: FLINK-5157 > URL: https://issues.apache.org/jira/browse/FLINK-5157 > Project: Flink > Issue Type: New Feature > Components: DataStream API, Streaming >Reporter: Ventura Del Monte >Assignee: Ventura Del Monte > > Following the logic behind [1,2], ProcessAllWindowFunction can be introduced > in Flink and AllWindowedStream can be extended in order to support them. > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata > [2] https://issues.apache.org/jira/browse/FLINK-4997 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5157) Extending AllWindow Function Metadata
[ https://issues.apache.org/jira/browse/FLINK-5157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887804#comment-15887804 ] ASF GitHub Bot commented on FLINK-5157: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2946#discussion_r103427991 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala --- @@ -199,6 +198,62 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { asScalaStream(javaStream.reduce(reducer, applyFunction, returnType)) } + /** +* Applies the given window function to each window. The window function is called for each +* evaluation of the window for each key individually. The output of the window function is +* interpreted as a regular non-windowed stream. +* +* Arriving data is pre-aggregated using the given pre-aggregation reducer. +* +* @param preAggregator The reduce function that is used for pre-aggregation +* @param windowFunction The process window function. +* @return The data stream that is the result of applying the window function to the window. +*/ + def reduce[R: TypeInformation]( --- End diff -- The new methods should be `@PublicEvolving`. I know the existing methods aren't, but they should be and are only like this due to oversights. That's not an error on your side. > Extending AllWindow Function Metadata > - > > Key: FLINK-5157 > URL: https://issues.apache.org/jira/browse/FLINK-5157 > Project: Flink > Issue Type: New Feature > Components: DataStream API, Streaming >Reporter: Ventura Del Monte >Assignee: Ventura Del Monte > > Following the logic behind [1,2], ProcessAllWindowFunction can be introduced > in Flink and AllWindowedStream can be extended in order to support them. > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata > [2] https://issues.apache.org/jira/browse/FLINK-4997 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5157) Extending AllWindow Function Metadata
[ https://issues.apache.org/jira/browse/FLINK-5157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880789#comment-15880789 ] ASF GitHub Bot commented on FLINK-5157: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2946 Thanks I'll have a look! > Extending AllWindow Function Metadata > - > > Key: FLINK-5157 > URL: https://issues.apache.org/jira/browse/FLINK-5157 > Project: Flink > Issue Type: New Feature > Components: DataStream API, Streaming >Reporter: Ventura Del Monte >Assignee: Ventura Del Monte > > Following the logic behind [1,2], ProcessAllWindowFunction can be introduced > in Flink and AllWindowedStream can be extended in order to support them. > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata > [2] https://issues.apache.org/jira/browse/FLINK-4997 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5157) Extending AllWindow Function Metadata
[ https://issues.apache.org/jira/browse/FLINK-5157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15878252#comment-15878252 ] ASF GitHub Bot commented on FLINK-5157: --- Github user VenturaDelMonte commented on the issue: https://github.com/apache/flink/pull/2946 I updated the PR accordingly to what developed in #328, please note that I also kept into account [FLINK-5741](https://issues.apache.org/jira/browse/FLINK-5741). CC @aljoscha @manuzhang > Extending AllWindow Function Metadata > - > > Key: FLINK-5157 > URL: https://issues.apache.org/jira/browse/FLINK-5157 > Project: Flink > Issue Type: New Feature > Components: DataStream API, Streaming >Reporter: Ventura Del Monte >Assignee: Ventura Del Monte > > Following the logic behind [1,2], ProcessAllWindowFunction can be introduced > in Flink and AllWindowedStream can be extended in order to support them. > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata > [2] https://issues.apache.org/jira/browse/FLINK-4997 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5157) Extending AllWindow Function Metadata
[ https://issues.apache.org/jira/browse/FLINK-5157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15722392#comment-15722392 ] ASF GitHub Bot commented on FLINK-5157: --- GitHub user VenturaDelMonte opened a pull request: https://github.com/apache/flink/pull/2946 [FLINK-5157] [streaming] Extend AllWindow function metadata This PR aims to extend AllWindow function metadata similarly to [FLINK-4997 ](https://github.com/apache/flink/pull/2756). Briefly, ProcessAllWindowFunction supporting window context metadata has been introduced and AllWindowedStream apply/fold/reduce methods have been overloaded in order to support this new function in both Scala and Java. Moreover, new InternalWindowFunction sub-classes have been added for internally handling ProcessAllWindowFunction-s. You can merge this pull request into a Git repository by running: $ git pull https://github.com/VenturaDelMonte/flink flink-5157 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2946.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2946 commit 34c54dbbeb4da546ba003ffdc8e37f4e7bb63d21 Author: Ventura Del MonteDate: 2016-12-05T08:07:42Z [FLINK-5157] [streaming] Extend AllWindow function metadata > Extending AllWindow Function Metadata > - > > Key: FLINK-5157 > URL: https://issues.apache.org/jira/browse/FLINK-5157 > Project: Flink > Issue Type: New Feature > Components: DataStream API, Streaming, Windowing Operators >Reporter: Ventura Del Monte >Assignee: Ventura Del Monte > Fix For: 1.2.0 > > > Following the logic behind [1,2], ProcessAllWindowFunction can be introduced > in Flink and AllWindowedStream can be extended in order to support them. > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata > [2] https://issues.apache.org/jira/browse/FLINK-4997 -- This message was sent by Atlassian JIRA (v6.3.4#6332)