This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0f8adc03e800b8f60d9bed5e127c342c10d8f337
Author: Dian Fu <dia...@apache.org>
AuthorDate: Fri Apr 23 19:35:21 2021 +0800

    [FLINK-20720][python][docs] Add documentation for ProcessFunction in Python 
DataStream API
    
    This closes #15733.
---
 .../dev/datastream/operators/process_function.md   | 119 +++++++++++++++++++++
 .../datastream/operators/process_function.md       |  52 +++++++++
 .../dev/datastream/operators/process_function.md   | 119 +++++++++++++++++++++
 .../datastream/operators/process_function.md       |  52 +++++++++
 4 files changed, 342 insertions(+)

diff --git a/docs/content.zh/docs/dev/datastream/operators/process_function.md 
b/docs/content.zh/docs/dev/datastream/operators/process_function.md
index c8078f7..38426b5 100644
--- a/docs/content.zh/docs/dev/datastream/operators/process_function.md
+++ b/docs/content.zh/docs/dev/datastream/operators/process_function.md
@@ -246,6 +246,94 @@ class CountWithTimeoutFunction extends 
KeyedProcessFunction[Tuple, (String, Stri
 }
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+import datetime
+
+from pyflink.common import Row, WatermarkStrategy
+from pyflink.common.typeinfo import Types
+from pyflink.common.watermark_strategy import TimestampAssigner
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
+from pyflink.datastream.state import ValueStateDescriptor
+from pyflink.table import StreamTableEnvironment
+
+
+class CountWithTimeoutFunction(KeyedProcessFunction):
+
+    def __init__(self):
+        self.state = None
+
+    def open(self, runtime_context: RuntimeContext):
+        self.state = runtime_context.get_state(ValueStateDescriptor(
+            "my_state", Types.ROW([Types.STRING(), Types.LONG(), 
Types.LONG()])))
+
+    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
+        # retrieve the current count
+        current = self.state.value()
+        if current is None:
+            current = Row(value.f1, 0, 0)
+
+        # update the state's count
+        current[1] += 1
+
+        # set the state's timestamp to the record's assigned event time 
timestamp
+        current[2] = ctx.timestamp()
+
+        # write the state back
+        self.state.update(current)
+
+        # schedule the next timer 60 seconds from the current event time
+        ctx.timer_service().register_event_time_timer(current[2] + 60000)
+
+    def on_timer(self, timestamp: int, ctx: 
'KeyedProcessFunction.OnTimerContext'):
+        # get the state for the key that scheduled the timer
+        result = self.state.value()
+
+        # check if this is an outdated timer or the latest timer
+        if timestamp == result[2] + 60000:
+            # emit the state on timeout
+            yield result[0], result[1]
+
+
+class MyTimestampAssigner(TimestampAssigner):
+
+    def __init__(self):
+        self.epoch = datetime.datetime.utcfromtimestamp(0)
+
+    def extract_timestamp(self, value, record_timestamp) -> int:
+        return int((value[0] - self.epoch).total_seconds() * 1000)
+
+
+if __name__ == '__main__':
+    env = StreamExecutionEnvironment.get_execution_environment()
+    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
+
+    t_env.execute_sql("""
+            CREATE TABLE my_source (
+              a TIMESTAMP(3),
+              b VARCHAR,
+              c VARCHAR
+            ) WITH (
+              'connector' = 'datagen',
+              'rows-per-second' = '10'
+            )
+        """)
+
+    stream = t_env.to_append_stream(
+        t_env.from_path('my_source'),
+        Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()]))
+    watermarked_stream = stream.assign_timestamps_and_watermarks(
+        WatermarkStrategy.for_monotonous_timestamps()
+                         .with_timestamp_assigner(MyTimestampAssigner()))
+
+    # apply the process function onto a keyed stream
+    result = watermarked_stream.key_by(lambda value: value[1]) \
+                               .process(CountWithTimeoutFunction()) \
+                               .print()
+    env.execute()
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 
@@ -281,6 +369,13 @@ override def onTimer(timestamp: Long, ctx: OnTimerContext, 
out: Collector[OUT]):
 }
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
+    key = ctx.get_current_key()
+    # ...
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 ## Timers
@@ -327,6 +422,12 @@ val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 
1000
 ctx.timerService.registerProcessingTimeTimer(coalescedTime)
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+coalesced_time = ((ctx.timestamp() + timeout) // 1000) * 1000
+ctx.timer_service().register_processing_time_timer(coalesced_time)
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 Since event-time timers only fire with watermarks coming in, you may also 
schedule and coalesce
@@ -345,6 +446,12 @@ val coalescedTime = ctx.timerService.currentWatermark + 1
 ctx.timerService.registerEventTimeTimer(coalescedTime)
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+coalesced_time = ctx.timer_service().current_watermark() + 1
+ctx.timer_service().register_event_time_timer(coalesced_time)
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 Timers can also be stopped and removed as follows:
@@ -364,6 +471,12 @@ val timestampOfTimerToStop = ...
 ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+timestamp_of_timer_to_stop = ...
+ctx.timer_service().delete_processing_time_timer(timestamp_of_timer_to_stop)
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 Stopping an event-time timer:
@@ -381,6 +494,12 @@ val timestampOfTimerToStop = ...
 ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+timestamp_of_timer_to_stop = ...
+ctx.timer_service().delete_event_time_timer(timestamp_of_timer_to_stop)
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 {{< hint info >}}
diff --git 
a/docs/content.zh/docs/dev/python/datastream/operators/process_function.md 
b/docs/content.zh/docs/dev/python/datastream/operators/process_function.md
new file mode 100644
index 0000000..d9f772b
--- /dev/null
+++ b/docs/content.zh/docs/dev/python/datastream/operators/process_function.md
@@ -0,0 +1,52 @@
+---
+title: "Process Function"
+weight: 4
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Process Function
+
+## ProcessFunction
+
+The `ProcessFunction` is a low-level stream processing operation, giving 
access to the basic building blocks of
+all (acyclic) streaming applications:
+
+- events (stream elements)
+- state (fault-tolerant, consistent, only on keyed stream)
+- timers (event time and processing time, only on keyed stream)
+
+The `ProcessFunction` can be thought of as a `FlatMapFunction` with access to 
keyed state and timers. It handles events
+by being invoked for each event received in the input stream(s).
+
+Please refer to [Process Function]({{< ref 
"docs/dev/datastream/operators/process_function" >}})
+for more details about the concept and usage of `ProcessFunction`.
+
+## Execution behavior of timer
+
+Python user-defined functions are executed in a separate Python process from 
Flink's operators which run in a JVM,
+the timer registration requests made in `ProcessFunction` will be sent to the 
Java operator asynchronously.
+Once received timer registration requests, the Java operator will register it 
into the underlying timer service.
+
+If the registered timer has already passed the current time (the current 
system time for processing time timer,
+or the current watermark for event time), it will be triggered immediately.
+
+Note that, due to the asynchronous processing characteristics, it may happen 
that the timer was triggered a little later than the actual time.
+For example, a registered processing time timer of `10:00:00` may be actually 
processed at `10:00:05`.
diff --git a/docs/content/docs/dev/datastream/operators/process_function.md 
b/docs/content/docs/dev/datastream/operators/process_function.md
index 90d9ed1..c1335ba 100644
--- a/docs/content/docs/dev/datastream/operators/process_function.md
+++ b/docs/content/docs/dev/datastream/operators/process_function.md
@@ -246,6 +246,94 @@ class CountWithTimeoutFunction extends 
KeyedProcessFunction[Tuple, (String, Stri
 }
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+import datetime
+
+from pyflink.common import Row, WatermarkStrategy
+from pyflink.common.typeinfo import Types
+from pyflink.common.watermark_strategy import TimestampAssigner
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
+from pyflink.datastream.state import ValueStateDescriptor
+from pyflink.table import StreamTableEnvironment
+
+
+class CountWithTimeoutFunction(KeyedProcessFunction):
+
+    def __init__(self):
+        self.state = None
+
+    def open(self, runtime_context: RuntimeContext):
+        self.state = runtime_context.get_state(ValueStateDescriptor(
+            "my_state", Types.ROW([Types.STRING(), Types.LONG(), 
Types.LONG()])))
+
+    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
+        # retrieve the current count
+        current = self.state.value()
+        if current is None:
+            current = Row(value.f1, 0, 0)
+
+        # update the state's count
+        current[1] += 1
+
+        # set the state's timestamp to the record's assigned event time 
timestamp
+        current[2] = ctx.timestamp()
+
+        # write the state back
+        self.state.update(current)
+
+        # schedule the next timer 60 seconds from the current event time
+        ctx.timer_service().register_event_time_timer(current[2] + 60000)
+
+    def on_timer(self, timestamp: int, ctx: 
'KeyedProcessFunction.OnTimerContext'):
+        # get the state for the key that scheduled the timer
+        result = self.state.value()
+
+        # check if this is an outdated timer or the latest timer
+        if timestamp == result[2] + 60000:
+            # emit the state on timeout
+            yield result[0], result[1]
+
+
+class MyTimestampAssigner(TimestampAssigner):
+
+    def __init__(self):
+        self.epoch = datetime.datetime.utcfromtimestamp(0)
+
+    def extract_timestamp(self, value, record_timestamp) -> int:
+        return int((value[0] - self.epoch).total_seconds() * 1000)
+
+
+if __name__ == '__main__':
+    env = StreamExecutionEnvironment.get_execution_environment()
+    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
+
+    t_env.execute_sql("""
+            CREATE TABLE my_source (
+              a TIMESTAMP(3),
+              b VARCHAR,
+              c VARCHAR
+            ) WITH (
+              'connector' = 'datagen',
+              'rows-per-second' = '10'
+            )
+        """)
+
+    stream = t_env.to_append_stream(
+        t_env.from_path('my_source'),
+        Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()]))
+    watermarked_stream = stream.assign_timestamps_and_watermarks(
+        WatermarkStrategy.for_monotonous_timestamps()
+                         .with_timestamp_assigner(MyTimestampAssigner()))
+
+    # apply the process function onto a keyed stream
+    result = watermarked_stream.key_by(lambda value: value[1]) \
+                               .process(CountWithTimeoutFunction()) \
+                               .print()
+    env.execute()
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 
@@ -281,6 +369,13 @@ override def onTimer(timestamp: Long, ctx: OnTimerContext, 
out: Collector[OUT]):
 }
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
+    key = ctx.get_current_key()
+    # ...
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 ## Timers
@@ -327,6 +422,12 @@ val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 
1000
 ctx.timerService.registerProcessingTimeTimer(coalescedTime)
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+coalesced_time = ((ctx.timestamp() + timeout) // 1000) * 1000
+ctx.timer_service().register_processing_time_timer(coalesced_time)
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 Since event-time timers only fire with watermarks coming in, you may also 
schedule and coalesce
@@ -345,6 +446,12 @@ val coalescedTime = ctx.timerService.currentWatermark + 1
 ctx.timerService.registerEventTimeTimer(coalescedTime)
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+coalesced_time = ctx.timer_service().current_watermark() + 1
+ctx.timer_service().register_event_time_timer(coalesced_time)
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 Timers can also be stopped and removed as follows:
@@ -364,6 +471,12 @@ val timestampOfTimerToStop = ...
 ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+timestamp_of_timer_to_stop = ...
+ctx.timer_service().delete_processing_time_timer(timestamp_of_timer_to_stop)
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 Stopping an event-time timer:
@@ -381,6 +494,12 @@ val timestampOfTimerToStop = ...
 ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+timestamp_of_timer_to_stop = ...
+ctx.timer_service().delete_event_time_timer(timestamp_of_timer_to_stop)
+```
+{{< /tab >}}
 {{< /tabs >}}
 
 {{< hint info >}}
diff --git 
a/docs/content/docs/dev/python/datastream/operators/process_function.md 
b/docs/content/docs/dev/python/datastream/operators/process_function.md
new file mode 100644
index 0000000..d9f772b
--- /dev/null
+++ b/docs/content/docs/dev/python/datastream/operators/process_function.md
@@ -0,0 +1,52 @@
+---
+title: "Process Function"
+weight: 4
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Process Function
+
+## ProcessFunction
+
+The `ProcessFunction` is a low-level stream processing operation, giving 
access to the basic building blocks of
+all (acyclic) streaming applications:
+
+- events (stream elements)
+- state (fault-tolerant, consistent, only on keyed stream)
+- timers (event time and processing time, only on keyed stream)
+
+The `ProcessFunction` can be thought of as a `FlatMapFunction` with access to 
keyed state and timers. It handles events
+by being invoked for each event received in the input stream(s).
+
+Please refer to [Process Function]({{< ref 
"docs/dev/datastream/operators/process_function" >}})
+for more details about the concept and usage of `ProcessFunction`.
+
+## Execution behavior of timer
+
+Python user-defined functions are executed in a separate Python process from 
Flink's operators which run in a JVM,
+the timer registration requests made in `ProcessFunction` will be sent to the 
Java operator asynchronously.
+Once received timer registration requests, the Java operator will register it 
into the underlying timer service.
+
+If the registered timer has already passed the current time (the current 
system time for processing time timer,
+or the current watermark for event time), it will be triggered immediately.
+
+Note that, due to the asynchronous processing characteristics, it may happen 
that the timer was triggered a little later than the actual time.
+For example, a registered processing time timer of `10:00:00` may be actually 
processed at `10:00:05`.

Reply via email to