[FLINK-7635] Support side output in ProcessWindowFunction

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c151a537
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c151a537
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c151a537

Branch: refs/heads/master
Commit: c151a537c205d20db598354ba5afc4f228c746c3
Parents: 68a99d7
Author: Bowen Li <bowenl...@gmail.com>
Authored: Tue Sep 19 23:35:34 2017 -0700
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Mon Sep 25 12:06:51 2017 +0200

----------------------------------------------------------------------
 .../InternalProcessApplyWindowContext.java      |  6 +++
 .../windowing/ProcessWindowFunction.java        |  9 ++++
 .../api/operators/ProcessOperator.java          |  5 +--
 .../operators/windowing/WindowOperator.java     |  7 +++
 .../functions/InternalProcessWindowContext.java |  6 +++
 .../functions/InternalWindowFunction.java       |  3 ++
 .../scala/function/ProcessWindowFunction.scala  |  9 ++--
 .../ScalaProcessWindowFunctionWrapper.scala     |  5 +++
 .../streaming/api/scala/SideOutputITCase.scala  | 46 ++++++++++++++++++++
 .../streaming/runtime/SideOutputITCase.java     | 35 +++++++++++++++
 10 files changed, 124 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
index 47a2e3a..3d52e35 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.functions.windowing;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.OutputTag;
 
 /**
  * Internal reusable context wrapper.
@@ -64,4 +65,9 @@ public class InternalProcessApplyWindowContext<IN, OUT, KEY, 
W extends Window>
        public KeyedStateStore globalState() {
                return context.globalState();
        }
+
+       @Override
+       public <X> void output(OutputTag<X> outputTag, X value) {
+               context.output(outputTag, value);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
index 506b610..08ed49c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
 
 /**
  * Base abstract class for functions that are evaluated over keyed (grouped) 
windows using a context
@@ -85,5 +86,13 @@ public abstract class ProcessWindowFunction<IN, OUT, KEY, W 
extends Window> exte
                 * State accessor for per-key global state.
                 */
                public abstract KeyedStateStore globalState();
+
+               /**
+                * Emits a record to the side output identified by the {@link 
OutputTag}.
+                *
+                * @param outputTag the {@code OutputTag} that identifies the 
side output to emit to.
+                * @param value The record to emit.
+                */
+               public abstract <X> void output(OutputTag<X> outputTag, X 
value);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
index 5c9e8fc..b353a63 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
@@ -73,10 +73,7 @@ public class ProcessOperator<IN, OUT>
                this.currentWatermark = mark.getTimestamp();
        }
 
-       private class ContextImpl
-                       extends ProcessFunction<IN, OUT>.Context
-                       implements TimerService {
-
+       private class ContextImpl extends ProcessFunction<IN, OUT>.Context 
implements TimerService {
                private StreamRecord<IN> element;
 
                private final ProcessingTimeService processingTimeService;

http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index b14739f..fd90e65 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -774,6 +774,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                public KeyedStateStore globalState() {
                        return WindowOperator.this.getKeyedStateStore();
                }
+
+               public <X> void output(OutputTag<X> outputTag, X value) {
+                       if (outputTag == null) {
+                               throw new IllegalArgumentException("OutputTag 
must not be null.");
+                       }
+                       output.collect(outputTag, new StreamRecord<>(value, 
window.maxTimestamp()));
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
index 9505332..4d5d1c6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.OutputTag;
 
 /**
  * Internal reusable context wrapper.
@@ -66,4 +67,9 @@ public class InternalProcessWindowContext<IN, OUT, KEY, W 
extends Window>
        public KeyedStateStore globalState() {
                return internalContext.globalState();
        }
+
+       @Override
+       public <X> void output(OutputTag<X> outputTag, X value) {
+               internalContext.output(outputTag, value);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
index 0999565..c304d7a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
 
 /**
  * Internal interface for functions that are evaluated over keyed (grouped) 
windows.
@@ -63,5 +64,7 @@ public interface InternalWindowFunction<IN, OUT, KEY, W 
extends Window> extends
                KeyedStateStore windowState();
 
                KeyedStateStore globalState();
+
+               <X> void output(OutputTag<X> outputTag, X value);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
index d2075db..7ae51ea 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
@@ -18,11 +18,10 @@
 
 package org.apache.flink.streaming.api.scala.function
 
-import java.io.Serializable
-
 import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.functions.AbstractRichFunction
 import org.apache.flink.api.common.state.KeyedStateStore
+import org.apache.flink.streaming.api.scala.OutputTag
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
@@ -88,6 +87,10 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: 
Window]
       * State accessor for per-key global state.
       */
     def globalState: KeyedStateStore
-  }
 
+    /**
+      * Emits a record to the side output identified by the [[OutputTag]].
+      */
+    def output[X](outputTag: OutputTag[X], value: X);
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
index bc4b7dd..98b050c 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.RuntimeContext
 import org.apache.flink.configuration.Configuration
 import 
org.apache.flink.streaming.api.functions.windowing.{ProcessWindowFunction => 
JProcessWindowFunction}
 import 
org.apache.flink.streaming.api.functions.windowing.{ProcessAllWindowFunction => 
JProcessAllWindowFunction}
+import org.apache.flink.streaming.api.scala.OutputTag
 import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction => 
ScalaProcessWindowFunction}
 import org.apache.flink.streaming.api.scala.function.{ProcessAllWindowFunction 
=> ScalaProcessAllWindowFunction}
 import org.apache.flink.streaming.api.windowing.windows.Window
@@ -56,6 +57,8 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W 
<: Window](
       override def windowState = context.windowState()
 
       override def globalState = context.globalState()
+
+      override def output[X](outputTag: OutputTag[X], value: X) = 
context.output(outputTag, value)
     }
     func.process(key, ctx, elements.asScala, out)
   }
@@ -71,6 +74,8 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W 
<: Window](
       override def windowState = context.windowState()
 
       override def globalState = context.globalState()
+
+      override def output[X](outputTag: OutputTag[X], value: X) = 
context.output(outputTag, value)
     }
     func.clear(ctx)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
index 29bcbcf..f09323c 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
@@ -234,6 +234,52 @@ class SideOutputITCase extends 
StreamingMultipleProgramsTestBase {
     assertEquals(util.Arrays.asList(("3", 3), ("4", 4)), 
lateResultSink.getResult)
   }
 
+  /**
+    * Test ProcessWindowFunction side output.
+    */
+  @Test
+  def testProcessWindowFunctionSideOutput() {
+    val resultSink = new TestListResultSink[String]
+    val sideOutputResultSink = new TestListResultSink[String]
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    val dataStream = env.fromElements(("1", 1), ("2", 2), ("5", 5), ("3", 3), 
("4", 4))
+
+
+    val sideOutputTag = OutputTag[String]("side")
+
+    val windowOperator = dataStream
+      .assignTimestampsAndWatermarks(new TestAssigner)
+      .keyBy(i => i._1)
+      .window(TumblingEventTimeWindows.of(Time.milliseconds(1)))
+      .process(new ProcessWindowFunction[(String, Int), String, String, 
TimeWindow] {
+        override def process(
+            key: String,
+            context: Context,
+            elements: Iterable[(String, Int)],
+            out: Collector[String]): Unit = {
+          for (in <- elements) {
+            out.collect(in._1)
+            context.output(sideOutputTag, "sideout-" + in._1)
+          }
+        }
+      })
+
+    windowOperator
+      .getSideOutput(sideOutputTag)
+      .addSink(sideOutputResultSink)
+
+    windowOperator.addSink(resultSink)
+
+    env.execute()
+
+    assertEquals(util.Arrays.asList("1", "2", "5"), resultSink.getResult)
+    assertEquals(util.Arrays.asList("sideout-1", "sideout-2", "sideout-5"),
+                  sideOutputResultSink.getResult)
+  }
 }
 
 class TestAssigner extends AssignerWithPunctuatedWatermarks[(String, Int)] {

http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
index f73bf42..f74f8ff 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -547,4 +548,38 @@ public class SideOutputITCase extends 
StreamingMultipleProgramsTestBase implemen
                assertEquals(Collections.singletonList(3), 
lateResultSink.getSortedResult());
        }
 
+       @Test
+       public void testProcessdWindowFunctionSideOutput() throws Exception {
+               TestListResultSink<Integer> resultSink = new 
TestListResultSink<>();
+               TestListResultSink<String> sideOutputResultSink = new 
TestListResultSink<>();
+
+               StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               see.setParallelism(3);
+               see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+               DataStream<Integer> dataStream = see.fromCollection(elements);
+
+               OutputTag<String> sideOutputTag = new 
OutputTag<String>("side"){};
+
+               SingleOutputStreamOperator<Integer> windowOperator = dataStream
+                               .assignTimestampsAndWatermarks(new 
TestWatermarkAssigner())
+                               .keyBy(new TestKeySelector())
+                               .timeWindow(Time.milliseconds(1), 
Time.milliseconds(1))
+                               .process(new ProcessWindowFunction<Integer, 
Integer, Integer, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void process(Integer integer, 
Context context, Iterable<Integer> elements, Collector<Integer> out) throws 
Exception {
+                                               out.collect(integer);
+                                               context.output(sideOutputTag, 
"sideout-" + String.valueOf(integer));
+                                       }
+                               });
+
+               
windowOperator.getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
+               windowOperator.addSink(resultSink);
+               see.execute();
+
+               assertEquals(Arrays.asList("sideout-1", "sideout-2", 
"sideout-5"), sideOutputResultSink.getSortedResult());
+               assertEquals(Arrays.asList(1, 2, 5), 
resultSink.getSortedResult());
+       }
 }

Reply via email to