This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7ad8e4f8fa6d179bf17759c656528fab62aec8aa
Author: shengqian.zhou <shengqian.z...@ly.com>
AuthorDate: Tue Apr 2 19:54:06 2019 +0800

    [hotfix][docs] Update process function example to use KeyedProcessFunction.
    
    This closes #8101.
---
 docs/dev/stream/operators/process_function.md | 38 ++++++++++++++++++---------
 1 file changed, 26 insertions(+), 12 deletions(-)

diff --git a/docs/dev/stream/operators/process_function.md 
b/docs/dev/stream/operators/process_function.md
index f0189f9..0216d84 100644
--- a/docs/dev/stream/operators/process_function.md
+++ b/docs/dev/stream/operators/process_function.md
@@ -77,16 +77,16 @@ trade.
 
 ## Example
 
-The following example maintains counts per key, and emits a key/count pair 
whenever a minute passes (in event time) without an update for that key:
+In the following example a `KeyedProcessFunction` maintains counts per key, 
and emits a key/count pair whenever a minute passes (in event time) without an 
update for that key:
 
   - The count, key, and last-modification-timestamp are stored in a 
`ValueState`, which is implicitly scoped by key.
-  - For each record, the `ProcessFunction` increments the counter and sets the 
last-modification timestamp
+  - For each record, the `KeyedProcessFunction` increments the counter and 
sets the last-modification timestamp
   - The function also schedules a callback one minute into the future (in 
event time)
   - Upon each callback, it checks the callback's event time timestamp against 
the last-modification time of the stored count
     and emits the key/count if they match (i.e., no further update occurred 
during that minute)
 
 <span class="label label-info">Note</span> This simple example could have been 
implemented with
-session windows. We use `ProcessFunction` here to illustrate the basic pattern 
it provides.
+session windows. We use `KeyedProcessFunction` here to illustrate the basic 
pattern it provides.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -94,6 +94,7 @@ session windows. We use `ProcessFunction` here to illustrate 
the basic pattern i
 {% highlight java %}
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -123,7 +124,8 @@ public class CountWithTimestamp {
 /**
  * The implementation of the ProcessFunction that maintains the count and 
timeouts
  */
-public class CountWithTimeoutFunction extends ProcessFunction<Tuple2<String, 
String>, Tuple2<String, Long>> {
+public class CountWithTimeoutFunction 
+        extends KeyedProcessFunction<Tuple, Tuple2<String, String>, 
Tuple2<String, Long>> {
 
     /** The state that is maintained by this process function */
     private ValueState<CountWithTimestamp> state;
@@ -134,8 +136,10 @@ public class CountWithTimeoutFunction extends 
ProcessFunction<Tuple2<String, Str
     }
 
     @Override
-    public void processElement(Tuple2<String, String> value, Context ctx, 
Collector<Tuple2<String, Long>> out)
-            throws Exception {
+    public void processElement(
+            Tuple2<String, String> value, 
+            Context ctx, 
+            Collector<Tuple2<String, Long>> out) throws Exception {
 
         // retrieve the current count
         CountWithTimestamp current = state.value();
@@ -158,8 +162,10 @@ public class CountWithTimeoutFunction extends 
ProcessFunction<Tuple2<String, Str
     }
 
     @Override
-    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<Tuple2<String, Long>> out)
-            throws Exception {
+    public void onTimer(
+            long timestamp, 
+            OnTimerContext ctx, 
+            Collector<Tuple2<String, Long>> out) throws Exception {
 
         // get the state for the key that scheduled the timer
         CountWithTimestamp result = state.value();
@@ -178,6 +184,7 @@ public class CountWithTimeoutFunction extends 
ProcessFunction<Tuple2<String, Str
 {% highlight scala %}
 import org.apache.flink.api.common.state.ValueState
 import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.streaming.api.functions.ProcessFunction.Context
 import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
@@ -199,16 +206,19 @@ case class CountWithTimestamp(key: String, count: Long, 
lastModified: Long)
 /**
   * The implementation of the ProcessFunction that maintains the count and 
timeouts
   */
-class CountWithTimeoutFunction extends ProcessFunction[(String, String), 
(String, Long)] {
+class CountWithTimeoutFunction extends KeyedProcessFunction[Tuple, (String, 
String), (String, Long)] {
 
   /** The state that is maintained by this process function */
   lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
     .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", 
classOf[CountWithTimestamp]))
 
 
-  override def processElement(value: (String, String), ctx: Context, out: 
Collector[(String, Long)]): Unit = {
-    // initialize or retrieve/update the state
+  override def processElement(
+      value: (String, String), 
+      ctx: KeyedProcessFunction[Tuple, (String, String), (String, 
Long)]#Context, 
+      out: Collector[(String, Long)]): Unit = {
 
+    // initialize or retrieve/update the state
     val current: CountWithTimestamp = state.value match {
       case null =>
         CountWithTimestamp(value._1, 1, ctx.timestamp)
@@ -223,7 +233,11 @@ class CountWithTimeoutFunction extends 
ProcessFunction[(String, String), (String
     ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
   }
 
-  override def onTimer(timestamp: Long, ctx: OnTimerContext, out: 
Collector[(String, Long)]): Unit = {
+  override def onTimer(
+      timestamp: Long, 
+      ctx: KeyedProcessFunction[Tuple, (String, String), (String, 
Long)]#OnTimerContext, 
+      out: Collector[(String, Long)]): Unit = {
+
     state.value match {
       case CountWithTimestamp(key, count, lastModified) if (timestamp == 
lastModified + 60000) =>
         out.collect((key, count))

Reply via email to