Github user stefanobaghino commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r56000181
  
    --- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/acceptPartialFunctions/OnWindowedStream.scala
 ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package 
org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.streaming.api.scala.{DataStream, WindowedStream}
    +import org.apache.flink.streaming.api.windowing.windows.Window
    +
    +class OnWindowedStream[T, K, W <: Window](ds: WindowedStream[T, K, W]) {
    +
    +  /**
    +    * Applies a reduce function to the window. The window function is 
called for each evaluation
    +    * of the window for each key individually. The output of the reduce 
function is interpreted
    +    * as a regular non-windowed stream.
    +    *
    +    * This window will try and pre-aggregate data as much as the window 
policies permit.
    +    * For example,tumbling time windows can perfectly pre-aggregate the 
data, meaning that only one
    +    * element per key is stored. Sliding time windows will pre-aggregate 
on the granularity of the
    +    * slide interval, so a few elements are stored per key (one per slide 
interval).
    +    * Custom windows may not be able to pre-aggregate, or may need to 
store extra values in an
    +    * aggregation tree.
    +    *
    +    * @param function The reduce function.
    +    * @return The data stream that is the result of applying the reduce 
function to the window.
    +    */
    +  def reduceWith(function: (T, T) => T) =
    +    ds.reduce(function)
    +
    +  /**
    +    * Applies the given fold function to each window. The window function 
is called for each
    +    * evaluation of the window for each key individually. The output of 
the reduce function is
    +    * interpreted as a regular non-windowed stream.
    +    *
    +    * @param function The fold function.
    +    * @return The data stream that is the result of applying the fold 
function to the window.
    +    */
    +  def foldWith[R: TypeInformation](initialValue: R)(function: (R, T) => R) 
=
    +    ds.fold(initialValue)(function)
    +
    +  /**
    +    * Applies the given window function to each window. The window 
function is called for each
    +    * evaluation of the window for each key individually. The output of 
the window function is
    +    * interpreted as a regular non-windowed stream.
    +    *
    +    * Arriving data is incrementally aggregated using the given fold 
function.
    +    *
    +    * @param initialValue The initial value of the fold
    +    * @param foldFunction The fold function that is used for incremental 
aggregation
    +    * @param windowFunction The window function.
    +    * @return The data stream that is the result of applying the window 
function to the window.
    +    */
    +  def applyWith[R: TypeInformation](initialValue: R)
    +                                   (foldFunction: (R, T) => R,
    +                                    windowFunction: (K, W, R) => 
TraversableOnce[R]):
    --- End diff --
    
    The implementation iterates over each `R` item. However, this approach 
seems to contrast with [the remarks you have on 
`it.to[Seq]`](https://github.com/apache/flink/pull/1704#discussion_r55998933); 
I will restore the `Iterator` here as well. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to