[FLINK-7635] Support side output in ProcessWindowFunction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c151a537 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c151a537 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c151a537 Branch: refs/heads/master Commit: c151a537c205d20db598354ba5afc4f228c746c3 Parents: 68a99d7 Author: Bowen Li <bowenl...@gmail.com> Authored: Tue Sep 19 23:35:34 2017 -0700 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Mon Sep 25 12:06:51 2017 +0200 ---------------------------------------------------------------------- .../InternalProcessApplyWindowContext.java | 6 +++ .../windowing/ProcessWindowFunction.java | 9 ++++ .../api/operators/ProcessOperator.java | 5 +-- .../operators/windowing/WindowOperator.java | 7 +++ .../functions/InternalProcessWindowContext.java | 6 +++ .../functions/InternalWindowFunction.java | 3 ++ .../scala/function/ProcessWindowFunction.scala | 9 ++-- .../ScalaProcessWindowFunctionWrapper.scala | 5 +++ .../streaming/api/scala/SideOutputITCase.scala | 46 ++++++++++++++++++++ .../streaming/runtime/SideOutputITCase.java | 35 +++++++++++++++ 10 files changed, 124 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java index 47a2e3a..3d52e35 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.functions.windowing; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.OutputTag; /** * Internal reusable context wrapper. @@ -64,4 +65,9 @@ public class InternalProcessApplyWindowContext<IN, OUT, KEY, W extends Window> public KeyedStateStore globalState() { return context.globalState(); } + + @Override + public <X> void output(OutputTag<X> outputTag, X value) { + context.output(outputTag, value); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java index 506b610..08ed49c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; /** * Base abstract class for functions that are evaluated over keyed (grouped) windows using a context @@ -85,5 +86,13 @@ public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> exte * State accessor for per-key global state. */ public abstract KeyedStateStore globalState(); + + /** + * Emits a record to the side output identified by the {@link OutputTag}. + * + * @param outputTag the {@code OutputTag} that identifies the side output to emit to. + * @param value The record to emit. + */ + public abstract <X> void output(OutputTag<X> outputTag, X value); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java index 5c9e8fc..b353a63 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java @@ -73,10 +73,7 @@ public class ProcessOperator<IN, OUT> this.currentWatermark = mark.getTimestamp(); } - private class ContextImpl - extends ProcessFunction<IN, OUT>.Context - implements TimerService { - + private class ContextImpl extends ProcessFunction<IN, OUT>.Context implements TimerService { private StreamRecord<IN> element; private final ProcessingTimeService processingTimeService; http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index b14739f..fd90e65 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -774,6 +774,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> public KeyedStateStore globalState() { return WindowOperator.this.getKeyedStateStore(); } + + public <X> void output(OutputTag<X> outputTag, X value) { + if (outputTag == null) { + throw new IllegalArgumentException("OutputTag must not be null."); + } + output.collect(outputTag, new StreamRecord<>(value, window.maxTimestamp())); + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java index 9505332..4d5d1c6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.OutputTag; /** * Internal reusable context wrapper. @@ -66,4 +67,9 @@ public class InternalProcessWindowContext<IN, OUT, KEY, W extends Window> public KeyedStateStore globalState() { return internalContext.globalState(); } + + @Override + public <X> void output(OutputTag<X> outputTag, X value) { + internalContext.output(outputTag, value); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java index 0999565..c304d7a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; /** * Internal interface for functions that are evaluated over keyed (grouped) windows. @@ -63,5 +64,7 @@ public interface InternalWindowFunction<IN, OUT, KEY, W extends Window> extends KeyedStateStore windowState(); KeyedStateStore globalState(); + + <X> void output(OutputTag<X> outputTag, X value); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala index d2075db..7ae51ea 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala @@ -18,11 +18,10 @@ package org.apache.flink.streaming.api.scala.function -import java.io.Serializable - import org.apache.flink.annotation.PublicEvolving import org.apache.flink.api.common.functions.AbstractRichFunction import org.apache.flink.api.common.state.KeyedStateStore +import org.apache.flink.streaming.api.scala.OutputTag import org.apache.flink.streaming.api.windowing.windows.Window import org.apache.flink.util.Collector @@ -88,6 +87,10 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] * State accessor for per-key global state. */ def globalState: KeyedStateStore - } + /** + * Emits a record to the side output identified by the [[OutputTag]]. + */ + def output[X](outputTag: OutputTag[X], value: X); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala index bc4b7dd..98b050c 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala @@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.windowing.{ProcessWindowFunction => JProcessWindowFunction} import org.apache.flink.streaming.api.functions.windowing.{ProcessAllWindowFunction => JProcessAllWindowFunction} +import org.apache.flink.streaming.api.scala.OutputTag import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction => ScalaProcessWindowFunction} import org.apache.flink.streaming.api.scala.function.{ProcessAllWindowFunction => ScalaProcessAllWindowFunction} import org.apache.flink.streaming.api.windowing.windows.Window @@ -56,6 +57,8 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window]( override def windowState = context.windowState() override def globalState = context.globalState() + + override def output[X](outputTag: OutputTag[X], value: X) = context.output(outputTag, value) } func.process(key, ctx, elements.asScala, out) } @@ -71,6 +74,8 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window]( override def windowState = context.windowState() override def globalState = context.globalState() + + override def output[X](outputTag: OutputTag[X], value: X) = context.output(outputTag, value) } func.clear(ctx) } http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala index 29bcbcf..f09323c 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala @@ -234,6 +234,52 @@ class SideOutputITCase extends StreamingMultipleProgramsTestBase { assertEquals(util.Arrays.asList(("3", 3), ("4", 4)), lateResultSink.getResult) } + /** + * Test ProcessWindowFunction side output. + */ + @Test + def testProcessWindowFunctionSideOutput() { + val resultSink = new TestListResultSink[String] + val sideOutputResultSink = new TestListResultSink[String] + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + + val dataStream = env.fromElements(("1", 1), ("2", 2), ("5", 5), ("3", 3), ("4", 4)) + + + val sideOutputTag = OutputTag[String]("side") + + val windowOperator = dataStream + .assignTimestampsAndWatermarks(new TestAssigner) + .keyBy(i => i._1) + .window(TumblingEventTimeWindows.of(Time.milliseconds(1))) + .process(new ProcessWindowFunction[(String, Int), String, String, TimeWindow] { + override def process( + key: String, + context: Context, + elements: Iterable[(String, Int)], + out: Collector[String]): Unit = { + for (in <- elements) { + out.collect(in._1) + context.output(sideOutputTag, "sideout-" + in._1) + } + } + }) + + windowOperator + .getSideOutput(sideOutputTag) + .addSink(sideOutputResultSink) + + windowOperator.addSink(resultSink) + + env.execute() + + assertEquals(util.Arrays.asList("1", "2", "5"), resultSink.getResult) + assertEquals(util.Arrays.asList("sideout-1", "sideout-2", "sideout-5"), + sideOutputResultSink.getResult) + } } class TestAssigner extends AssignerWithPunctuatedWatermarks[(String, Int)] { http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java index f73bf42..f74f8ff 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java @@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -547,4 +548,38 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen assertEquals(Collections.singletonList(3), lateResultSink.getSortedResult()); } + @Test + public void testProcessdWindowFunctionSideOutput() throws Exception { + TestListResultSink<Integer> resultSink = new TestListResultSink<>(); + TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>(); + + StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); + see.setParallelism(3); + see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + DataStream<Integer> dataStream = see.fromCollection(elements); + + OutputTag<String> sideOutputTag = new OutputTag<String>("side"){}; + + SingleOutputStreamOperator<Integer> windowOperator = dataStream + .assignTimestampsAndWatermarks(new TestWatermarkAssigner()) + .keyBy(new TestKeySelector()) + .timeWindow(Time.milliseconds(1), Time.milliseconds(1)) + .process(new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void process(Integer integer, Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception { + out.collect(integer); + context.output(sideOutputTag, "sideout-" + String.valueOf(integer)); + } + }); + + windowOperator.getSideOutput(sideOutputTag).addSink(sideOutputResultSink); + windowOperator.addSink(resultSink); + see.execute(); + + assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-5"), sideOutputResultSink.getSortedResult()); + assertEquals(Arrays.asList(1, 2, 5), resultSink.getSortedResult()); + } }