Repository: flink
Updated Branches:
  refs/heads/master 89de78c72 -> 4f8d01fba


[FLINK-7660] Support sideOutput in ProcessAllWindowFunction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/39682c45
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/39682c45
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/39682c45

Branch: refs/heads/master
Commit: 39682c456a211a773014474e696babff898a76fe
Parents: 89de78c
Author: Bowen Li <[email protected]>
Authored: Wed Sep 27 23:09:20 2017 -0700
Committer: Aljoscha Krettek <[email protected]>
Committed: Thu Oct 12 11:12:33 2017 +0200

----------------------------------------------------------------------
 .../FoldApplyProcessAllWindowFunction.java      |  7 +--
 .../InternalProcessApplyAllWindowContext.java   | 13 ++++--
 .../windowing/ProcessAllWindowFunction.java     |  9 ++++
 .../ReduceApplyProcessAllWindowFunction.java    |  6 +--
 .../operators/windowing/WindowOperator.java     |  1 +
 .../InternalProcessAllWindowContext.java        |  6 +++
 .../function/ProcessAllWindowFunction.scala     |  6 +++
 .../ScalaProcessWindowFunctionWrapper.scala     |  4 ++
 .../streaming/api/scala/SideOutputITCase.scala  | 45 ++++++++++++++++++++
 .../streaming/runtime/SideOutputITCase.java     | 37 ++++++++++++++++
 10 files changed, 121 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
index 362956d..591e2af 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
@@ -106,8 +106,7 @@ public class FoldApplyProcessAllWindowFunction<W extends 
Window, T, ACC, R>
                }
 
                this.ctx.window = context.window();
-               this.ctx.windowState = context.windowState();
-               this.ctx.globalState = context.globalState();
+               this.ctx.context = context;
 
                windowFunction.process(ctx, Collections.singletonList(result), 
out);
        }
@@ -115,8 +114,7 @@ public class FoldApplyProcessAllWindowFunction<W extends 
Window, T, ACC, R>
        @Override
        public void clear(final Context context) throws Exception {
                this.ctx.window = context.window();
-               this.ctx.windowState = context.windowState();
-               this.ctx.globalState = context.globalState();
+               this.ctx.context = context;
                windowFunction.clear(ctx);
        }
 
@@ -136,5 +134,4 @@ public class FoldApplyProcessAllWindowFunction<W extends 
Window, T, ACC, R>
 
                serializedInitialValue = baos.toByteArray();
        }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
index a27d71b..98557ac 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.functions.windowing;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.OutputTag;
 
 /**
  * Internal reusable context wrapper.
@@ -34,8 +35,7 @@ public class InternalProcessApplyAllWindowContext<IN, OUT, W 
extends Window>
        extends ProcessAllWindowFunction<IN, OUT, W>.Context {
 
        W window;
-       KeyedStateStore windowState;
-       KeyedStateStore globalState;
+       ProcessAllWindowFunction.Context context;
 
        InternalProcessApplyAllWindowContext(ProcessAllWindowFunction<IN, OUT, 
W> function) {
                function.super();
@@ -48,11 +48,16 @@ public class InternalProcessApplyAllWindowContext<IN, OUT, 
W extends Window>
 
        @Override
        public KeyedStateStore windowState() {
-               return windowState;
+               return context.windowState();
        }
 
        @Override
        public KeyedStateStore globalState() {
-               return globalState;
+               return context.globalState();
+       }
+
+       @Override
+       public <X> void output(OutputTag<X> outputTag, X value) {
+               context.output(outputTag, value);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
index 34a37bf..f27f3c0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
 
 /**
  * Base abstract class for functions that are evaluated over non-keyed windows 
using a context
@@ -77,5 +78,13 @@ public abstract class ProcessAllWindowFunction<IN, OUT, W 
extends Window> extend
                 * State accessor for per-key global state.
                 */
                public abstract KeyedStateStore globalState();
+
+               /**
+                * Emits a record to the side output identified by the {@link 
OutputTag}.
+                *
+                * @param outputTag the {@code OutputTag} that identifies the 
side output to emit to.
+                * @param value The record to emit.
+                */
+               public abstract <X> void output(OutputTag<X> outputTag, X 
value);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
index 108ba9e..ee8328a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
@@ -60,8 +60,7 @@ public class ReduceApplyProcessAllWindowFunction<W extends 
Window, T, R> extends
                }
 
                this.ctx.window = context.window();
-               this.ctx.windowState = context.windowState();
-               this.ctx.globalState = context.globalState();
+               this.ctx.context = context;
 
                windowFunction.process(ctx, Collections.singletonList(curr), 
out);
        }
@@ -69,8 +68,7 @@ public class ReduceApplyProcessAllWindowFunction<W extends 
Window, T, R> extends
        @Override
        public void clear(final Context context) throws Exception {
                this.ctx.window = context.window();
-               this.ctx.windowState = context.windowState();
-               this.ctx.globalState = context.globalState();
+               this.ctx.context = context;
 
                windowFunction.clear(ctx);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index fd90e65..4e75345 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -775,6 +775,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                        return WindowOperator.this.getKeyedStateStore();
                }
 
+               @Override
                public <X> void output(OutputTag<X> outputTag, X value) {
                        if (outputTag == null) {
                                throw new IllegalArgumentException("OutputTag 
must not be null.");

http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
index 66ec656..f1146b9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.OutputTag;
 
 /**
  * Internal reusable context wrapper.
@@ -55,4 +56,9 @@ public class InternalProcessAllWindowContext<IN, OUT, W 
extends Window>
        public KeyedStateStore globalState() {
                return internalContext.globalState();
        }
+
+       @Override
+       public <X> void output(OutputTag<X> outputTag, X value) {
+               internalContext.output(outputTag, value);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
index 49911e4..b91b2a0 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.scala.function
 import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.functions.AbstractRichFunction
 import org.apache.flink.api.common.state.KeyedStateStore
+import org.apache.flink.streaming.api.scala.OutputTag
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
@@ -73,6 +74,11 @@ abstract class ProcessAllWindowFunction[IN, OUT, W <: Window]
       * State accessor for per-key global state.
       */
     def globalState: KeyedStateStore
+
+    /**
+      * Emits a record to the side output identified by the [[OutputTag]].
+      */
+    def output[X](outputTag: OutputTag[X], value: X)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
index 98b050c..9a6156d 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -127,6 +127,8 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W 
<: Window](
       override def windowState = context.windowState()
 
       override def globalState = context.globalState()
+
+      override def output[X](outputTag: OutputTag[X], value: X) = 
context.output(outputTag, value)
     }
     func.process(ctx, elements.asScala, out)
   }
@@ -138,6 +140,8 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W 
<: Window](
       override def windowState = context.windowState()
 
       override def globalState = context.globalState()
+
+      override def output[X](outputTag: OutputTag[X], value: X) = 
context.output(outputTag, value)
     }
     func.clear(ctx)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
index f09323c..8e66171 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
@@ -280,6 +280,51 @@ class SideOutputITCase extends 
StreamingMultipleProgramsTestBase {
     assertEquals(util.Arrays.asList("sideout-1", "sideout-2", "sideout-5"),
                   sideOutputResultSink.getResult)
   }
+
+  /**
+    * Test ProcessAllWindowFunction side output.
+    */
+  @Test
+  def testProcessAllWindowFunctionSideOutput() {
+    val resultSink = new TestListResultSink[String]
+    val sideOutputResultSink = new TestListResultSink[String]
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    val dataStream = env.fromElements(("1", 1), ("2", 2), ("5", 5), ("3", 3), 
("4", 4))
+
+
+    val sideOutputTag = OutputTag[String]("side")
+
+    val windowOperator = dataStream
+      .assignTimestampsAndWatermarks(new TestAssigner)
+      .windowAll(TumblingEventTimeWindows.of(Time.milliseconds(1)))
+      .process(new ProcessAllWindowFunction[(String, Int), String, TimeWindow] 
{
+        override def process(
+                              context: Context,
+                              elements: Iterable[(String, Int)],
+                              out: Collector[String]): Unit = {
+          for (in <- elements) {
+            out.collect(in._1)
+            context.output(sideOutputTag, "sideout-" + in._1)
+          }
+        }
+      })
+
+    windowOperator
+      .getSideOutput(sideOutputTag)
+      .addSink(sideOutputResultSink)
+
+    windowOperator.addSink(resultSink)
+
+    env.execute()
+
+    assertEquals(util.Arrays.asList("1", "2", "5"), resultSink.getResult)
+    assertEquals(util.Arrays.asList("sideout-1", "sideout-2", "sideout-5"),
+      sideOutputResultSink.getResult)
+  }
 }
 
 class TestAssigner extends AssignerWithPunctuatedWatermarks[(String, Int)] {

http://git-wip-us.apache.org/repos/asf/flink/blob/39682c45/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
index f74f8ff..7f3fe8b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -582,4 +583,40 @@ public class SideOutputITCase extends 
StreamingMultipleProgramsTestBase implemen
                assertEquals(Arrays.asList("sideout-1", "sideout-2", 
"sideout-5"), sideOutputResultSink.getSortedResult());
                assertEquals(Arrays.asList(1, 2, 5), 
resultSink.getSortedResult());
        }
+
+       @Test
+       public void testProcessAllWindowFunctionSideOutput() throws Exception {
+               TestListResultSink<Integer> resultSink = new 
TestListResultSink<>();
+               TestListResultSink<String> sideOutputResultSink = new 
TestListResultSink<>();
+
+               StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               see.setParallelism(1);
+               see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+               DataStream<Integer> dataStream = see.fromCollection(elements);
+
+               OutputTag<String> sideOutputTag = new 
OutputTag<String>("side"){};
+
+               SingleOutputStreamOperator<Integer> windowOperator = dataStream
+                               .assignTimestampsAndWatermarks(new 
TestWatermarkAssigner())
+                               .timeWindowAll(Time.milliseconds(1), 
Time.milliseconds(1))
+                               .process(new ProcessAllWindowFunction<Integer, 
Integer, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void process(Context context, 
Iterable<Integer> elements, Collector<Integer> out) throws Exception {
+                                               for (Integer e : elements) {
+                                                       out.collect(e);
+                                                       
context.output(sideOutputTag, "sideout-" + String.valueOf(e));
+                                               }
+                                       }
+                               });
+
+               
windowOperator.getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
+               windowOperator.addSink(resultSink);
+               see.execute();
+
+               assertEquals(Arrays.asList("sideout-1", "sideout-2", 
"sideout-5"), sideOutputResultSink.getSortedResult());
+               assertEquals(Arrays.asList(1, 2, 5), 
resultSink.getSortedResult());
+       }
 }

Reply via email to