Repository: flink Updated Branches: refs/heads/master 7ef068ccc -> 354a13edf
[FLINK-6023] fix process function doc examples This closes #3510. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/354a13ed Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/354a13ed Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/354a13ed Branch: refs/heads/master Commit: 354a13edff6821c57418802048de52ad5ba3d557 Parents: 7ef068c Author: Mauro Cortellazzi <mauro.cortella...@radicalbit.io> Authored: Fri Mar 10 15:12:55 2017 +0100 Committer: Kurt Young <k...@apache.org> Committed: Sat Mar 11 10:31:38 2017 +0800 ---------------------------------------------------------------------- docs/dev/stream/process_function.md | 56 ++++++++++++++++---------------- 1 file changed, 28 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/354a13ed/docs/dev/stream/process_function.md ---------------------------------------------------------------------- diff --git a/docs/dev/stream/process_function.md b/docs/dev/stream/process_function.md index 1f93f68..25de9a4 100644 --- a/docs/dev/stream/process_function.md +++ b/docs/dev/stream/process_function.md @@ -96,7 +96,7 @@ import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.RichProcessFunction; +import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction.Context; import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext; import org.apache.flink.util.Collector; @@ -123,7 +123,7 @@ public class CountWithTimestamp { /** * The implementation of the ProcessFunction that maintains the count and timeouts */ -public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> { +public class CountWithTimeoutFunction extends ProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> { /** The state that is maintained by this process function */ private ValueState<CountWithTimestamp> state; @@ -134,7 +134,7 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String, } @Override - public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) + public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception { // retrieve the current count @@ -154,7 +154,7 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String, state.update(current); // schedule the next timer 60 seconds from the current event time - ctx.timerService().registerEventTimeTimer(current.timestamp + 60000); + ctx.timerService().registerEventTimeTimer(current.lastModified + 60000); } @Override @@ -165,8 +165,8 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String, CountWithTimestamp result = state.value(); // check if this is an outdated timer or the latest timer - if (timestamp == result.lastModified) { - // emit the state + if (timestamp == result.lastModified + 60000) { + // emit the state on timeout out.collect(new Tuple2<String, Long>(result.key, result.count)); } } @@ -176,43 +176,43 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String, <div data-lang="scala" markdown="1"> {% highlight scala %} -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.streaming.api.functions.ProcessFunction.Context; -import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext; -import org.apache.flink.util.Collector; +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.functions.ProcessFunction.Context +import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext +import org.apache.flink.util.Collector // the source data stream -DataStream<Tuple2<String, String>> stream = ...; +val stream: DataStream[Tuple2[String, String]] = ... // apply the process function onto a keyed stream -DataStream<Tuple2<String, Long>> result = stream - .keyBy(0) - .process(new CountWithTimeoutFunction()); +val result: DataStream[Tuple2[String, Long]] = stream + .keyBy(0) + .process(new CountWithTimeoutFunction()) /** - * The data type stored in the state - */ + * The data type stored in the state + */ case class CountWithTimestamp(key: String, count: Long, lastModified: Long) /** - * The implementation of the ProcessFunction that maintains the count and timeouts - */ -class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long)] { + * The implementation of the ProcessFunction that maintains the count and timeouts + */ +class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String, Long)] { /** The state that is maintained by this process function */ - lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext() - .getState(new ValueStateDescriptor<>("myState", clasOf[CountWithTimestamp])) + lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext + .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp])) - override def processElement(value: (String, Long), ctx: Context, out: Collector[(String, Long)]): Unit = { + override def processElement(value: (String, String), ctx: Context, out: Collector[(String, Long)]): Unit = { // initialize or retrieve/update the state val current: CountWithTimestamp = state.value match { case null => - CountWithTimestamp(key, 1, ctx.timestamp) - case CountWithTimestamp(key, count, time) => + CountWithTimestamp(value._1, 1, ctx.timestamp) + case CountWithTimestamp(key, count, lastModified) => CountWithTimestamp(key, count + 1, ctx.timestamp) } @@ -220,12 +220,12 @@ class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long state.update(current) // schedule the next timer 60 seconds from the current event time - ctx.timerService.registerEventTimeTimer(current.timestamp + 60000) + ctx.timerService.registerEventTimeTimer(current.lastModified + 60000) } override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = { state.value match { - case CountWithTimestamp(key, count, lastModified) if (lastModified == timestamp) => + case CountWithTimestamp(key, count, lastModified) if (lastModified == timestamp + 60000) => out.collect((key, count)) case _ => }