Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1704#discussion_r55995217
--- Diff:
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/acceptPartialFunctions/OnWindowedStream.scala
---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package
org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.{DataStream, WindowedStream}
+import org.apache.flink.streaming.api.windowing.windows.Window
+
+class OnWindowedStream[T, K, W <: Window](ds: WindowedStream[T, K, W]) {
+
+ /**
+ * Applies a reduce function to the window. The window function is
called for each evaluation
+ * of the window for each key individually. The output of the reduce
function is interpreted
+ * as a regular non-windowed stream.
+ *
+ * This window will try and pre-aggregate data as much as the window
policies permit.
+ * For example,tumbling time windows can perfectly pre-aggregate the
data, meaning that only one
+ * element per key is stored. Sliding time windows will pre-aggregate
on the granularity of the
+ * slide interval, so a few elements are stored per key (one per slide
interval).
+ * Custom windows may not be able to pre-aggregate, or may need to
store extra values in an
+ * aggregation tree.
+ *
+ * @param function The reduce function.
+ * @return The data stream that is the result of applying the reduce
function to the window.
+ */
+ def reduceWith(function: (T, T) => T) =
+ ds.reduce(function)
+
+ /**
+ * Applies the given fold function to each window. The window function
is called for each
+ * evaluation of the window for each key individually. The output of
the reduce function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * @param function The fold function.
+ * @return The data stream that is the result of applying the fold
function to the window.
+ */
+ def foldWith[R: TypeInformation](initialValue: R)(function: (R, T) => R)
=
+ ds.fold(initialValue)(function)
+
+ /**
+ * Applies the given window function to each window. The window
function is called for each
+ * evaluation of the window for each key individually. The output of
the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * Arriving data is incrementally aggregated using the given fold
function.
+ *
+ * @param initialValue The initial value of the fold
+ * @param foldFunction The fold function that is used for incremental
aggregation
+ * @param windowFunction The window function.
+ * @return The data stream that is the result of applying the window
function to the window.
+ */
+ def applyWith[R: TypeInformation](initialValue: R)
+ (foldFunction: (R, T) => R,
+ windowFunction: (K, W, R) =>
TraversableOnce[R]):
--- End diff --
Why does the `windowFunction` work on a single `R` element and not on all
elements of a window?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---