[ https://issues.apache.org/jira/browse/FLINK-5157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15887802#comment-15887802 ]
ASF GitHub Bot commented on FLINK-5157: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2946#discussion_r103425262 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java --- @@ -642,31 +980,71 @@ public AllWindowedStream(DataStream<T> input, * @return The data stream that is the result of applying the window function to the window. */ public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function) { + String callLocation = Utils.getCallLocationName(); + function = input.getExecutionEnvironment().clean(function); TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( function, AllWindowFunction.class, true, true, getInputType(), null, false); - - return apply(function, resultType); + return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation); } /** * Applies the given window function to each window. The window function is called for each - * evaluation of the window for each key individually. The output of the window function is + * evaluation of the window. The output of the window function is * interpreted as a regular non-windowed stream. * * <p> * Not that this function requires that all data in the windows is buffered until the window * is evaluated, as the function provides no means of incremental aggregation. * * @param function The window function. - * @param resultType Type information for the result type of the window function * @return The data stream that is the result of applying the window function to the window. */ public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) { + String callLocation = Utils.getCallLocationName(); + function = input.getExecutionEnvironment().clean(function); + return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation); + } - //clean the closure + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p> + * Not that this function requires that all data in the windows is buffered until the window + * is evaluated, as the function provides no means of incremental aggregation. + * + * @param function The process window function. + * @return The data stream that is the result of applying the window function to the window. + */ + public <R> SingleOutputStreamOperator<R> process(ProcessAllWindowFunction<T, R, W> function) { --- End diff -- This and the following should be `@PublicEvolving` > Extending AllWindow Function Metadata > ------------------------------------- > > Key: FLINK-5157 > URL: https://issues.apache.org/jira/browse/FLINK-5157 > Project: Flink > Issue Type: New Feature > Components: DataStream API, Streaming > Reporter: Ventura Del Monte > Assignee: Ventura Del Monte > > Following the logic behind [1,2], ProcessAllWindowFunction can be introduced > in Flink and AllWindowedStream can be extended in order to support them. > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata > [2] https://issues.apache.org/jira/browse/FLINK-4997 -- This message was sent by Atlassian JIRA (v6.3.15#6346)