This is an automated email from the ASF dual-hosted git repository. fhueske pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 302c26e52454733f09820a44957b8fd0fe8f7d68 Author: shengqian.zhou <shengqian.z...@ly.com> AuthorDate: Tue Apr 2 19:54:06 2019 +0800 [hotfix][docs] Update process function example to use KeyedProcessFunction. This closes #8101. --- docs/dev/stream/operators/process_function.md | 38 ++++++++++++++++++--------- 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/docs/dev/stream/operators/process_function.md b/docs/dev/stream/operators/process_function.md index f0189f9..0216d84 100644 --- a/docs/dev/stream/operators/process_function.md +++ b/docs/dev/stream/operators/process_function.md @@ -77,16 +77,16 @@ trade. ## Example -The following example maintains counts per key, and emits a key/count pair whenever a minute passes (in event time) without an update for that key: +In the following example a `KeyedProcessFunction` maintains counts per key, and emits a key/count pair whenever a minute passes (in event time) without an update for that key: - The count, key, and last-modification-timestamp are stored in a `ValueState`, which is implicitly scoped by key. - - For each record, the `ProcessFunction` increments the counter and sets the last-modification timestamp + - For each record, the `KeyedProcessFunction` increments the counter and sets the last-modification timestamp - The function also schedules a callback one minute into the future (in event time) - Upon each callback, it checks the callback's event time timestamp against the last-modification time of the stored count and emits the key/count if they match (i.e., no further update occurred during that minute) <span class="label label-info">Note</span> This simple example could have been implemented with -session windows. We use `ProcessFunction` here to illustrate the basic pattern it provides. +session windows. We use `KeyedProcessFunction` here to illustrate the basic pattern it provides. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> @@ -94,6 +94,7 @@ session windows. We use `ProcessFunction` here to illustrate the basic pattern i {% highlight java %} import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; @@ -123,7 +124,8 @@ public class CountWithTimestamp { /** * The implementation of the ProcessFunction that maintains the count and timeouts */ -public class CountWithTimeoutFunction extends ProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> { +public class CountWithTimeoutFunction + extends KeyedProcessFunction<Tuple, Tuple2<String, String>, Tuple2<String, Long>> { /** The state that is maintained by this process function */ private ValueState<CountWithTimestamp> state; @@ -134,8 +136,10 @@ public class CountWithTimeoutFunction extends ProcessFunction<Tuple2<String, Str } @Override - public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out) - throws Exception { + public void processElement( + Tuple2<String, String> value, + Context ctx, + Collector<Tuple2<String, Long>> out) throws Exception { // retrieve the current count CountWithTimestamp current = state.value(); @@ -158,8 +162,10 @@ public class CountWithTimeoutFunction extends ProcessFunction<Tuple2<String, Str } @Override - public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) - throws Exception { + public void onTimer( + long timestamp, + OnTimerContext ctx, + Collector<Tuple2<String, Long>> out) throws Exception { // get the state for the key that scheduled the timer CountWithTimestamp result = state.value(); @@ -178,6 +184,7 @@ public class CountWithTimeoutFunction extends ProcessFunction<Tuple2<String, Str {% highlight scala %} import org.apache.flink.api.common.state.ValueState import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.functions.ProcessFunction.Context import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext @@ -199,16 +206,19 @@ case class CountWithTimestamp(key: String, count: Long, lastModified: Long) /** * The implementation of the ProcessFunction that maintains the count and timeouts */ -class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String, Long)] { +class CountWithTimeoutFunction extends KeyedProcessFunction[Tuple, (String, String), (String, Long)] { /** The state that is maintained by this process function */ lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp])) - override def processElement(value: (String, String), ctx: Context, out: Collector[(String, Long)]): Unit = { - // initialize or retrieve/update the state + override def processElement( + value: (String, String), + ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#Context, + out: Collector[(String, Long)]): Unit = { + // initialize or retrieve/update the state val current: CountWithTimestamp = state.value match { case null => CountWithTimestamp(value._1, 1, ctx.timestamp) @@ -223,7 +233,11 @@ class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String ctx.timerService.registerEventTimeTimer(current.lastModified + 60000) } - override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = { + override def onTimer( + timestamp: Long, + ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#OnTimerContext, + out: Collector[(String, Long)]): Unit = { + state.value match { case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) => out.collect((key, count))