This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0f8adc03e800b8f60d9bed5e127c342c10d8f337 Author: Dian Fu <dia...@apache.org> AuthorDate: Fri Apr 23 19:35:21 2021 +0800 [FLINK-20720][python][docs] Add documentation for ProcessFunction in Python DataStream API This closes #15733. --- .../dev/datastream/operators/process_function.md | 119 +++++++++++++++++++++ .../datastream/operators/process_function.md | 52 +++++++++ .../dev/datastream/operators/process_function.md | 119 +++++++++++++++++++++ .../datastream/operators/process_function.md | 52 +++++++++ 4 files changed, 342 insertions(+) diff --git a/docs/content.zh/docs/dev/datastream/operators/process_function.md b/docs/content.zh/docs/dev/datastream/operators/process_function.md index c8078f7..38426b5 100644 --- a/docs/content.zh/docs/dev/datastream/operators/process_function.md +++ b/docs/content.zh/docs/dev/datastream/operators/process_function.md @@ -246,6 +246,94 @@ class CountWithTimeoutFunction extends KeyedProcessFunction[Tuple, (String, Stri } ``` {{< /tab >}} +{{< tab "Python" >}} +```python +import datetime + +from pyflink.common import Row, WatermarkStrategy +from pyflink.common.typeinfo import Types +from pyflink.common.watermark_strategy import TimestampAssigner +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext +from pyflink.datastream.state import ValueStateDescriptor +from pyflink.table import StreamTableEnvironment + + +class CountWithTimeoutFunction(KeyedProcessFunction): + + def __init__(self): + self.state = None + + def open(self, runtime_context: RuntimeContext): + self.state = runtime_context.get_state(ValueStateDescriptor( + "my_state", Types.ROW([Types.STRING(), Types.LONG(), Types.LONG()]))) + + def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): + # retrieve the current count + current = self.state.value() + if current is None: + current = Row(value.f1, 0, 0) + + # update the state's count + current[1] += 1 + + # set the state's timestamp to the record's assigned event time timestamp + current[2] = ctx.timestamp() + + # write the state back + self.state.update(current) + + # schedule the next timer 60 seconds from the current event time + ctx.timer_service().register_event_time_timer(current[2] + 60000) + + def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'): + # get the state for the key that scheduled the timer + result = self.state.value() + + # check if this is an outdated timer or the latest timer + if timestamp == result[2] + 60000: + # emit the state on timeout + yield result[0], result[1] + + +class MyTimestampAssigner(TimestampAssigner): + + def __init__(self): + self.epoch = datetime.datetime.utcfromtimestamp(0) + + def extract_timestamp(self, value, record_timestamp) -> int: + return int((value[0] - self.epoch).total_seconds() * 1000) + + +if __name__ == '__main__': + env = StreamExecutionEnvironment.get_execution_environment() + t_env = StreamTableEnvironment.create(stream_execution_environment=env) + + t_env.execute_sql(""" + CREATE TABLE my_source ( + a TIMESTAMP(3), + b VARCHAR, + c VARCHAR + ) WITH ( + 'connector' = 'datagen', + 'rows-per-second' = '10' + ) + """) + + stream = t_env.to_append_stream( + t_env.from_path('my_source'), + Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()])) + watermarked_stream = stream.assign_timestamps_and_watermarks( + WatermarkStrategy.for_monotonous_timestamps() + .with_timestamp_assigner(MyTimestampAssigner())) + + # apply the process function onto a keyed stream + result = watermarked_stream.key_by(lambda value: value[1]) \ + .process(CountWithTimeoutFunction()) \ + .print() + env.execute() +``` +{{< /tab >}} {{< /tabs >}} @@ -281,6 +369,13 @@ override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]): } ``` {{< /tab >}} +{{< tab "Python" >}} +```python +def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'): + key = ctx.get_current_key() + # ... +``` +{{< /tab >}} {{< /tabs >}} ## Timers @@ -327,6 +422,12 @@ val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000 ctx.timerService.registerProcessingTimeTimer(coalescedTime) ``` {{< /tab >}} +{{< tab "Python" >}} +```python +coalesced_time = ((ctx.timestamp() + timeout) // 1000) * 1000 +ctx.timer_service().register_processing_time_timer(coalesced_time) +``` +{{< /tab >}} {{< /tabs >}} Since event-time timers only fire with watermarks coming in, you may also schedule and coalesce @@ -345,6 +446,12 @@ val coalescedTime = ctx.timerService.currentWatermark + 1 ctx.timerService.registerEventTimeTimer(coalescedTime) ``` {{< /tab >}} +{{< tab "Python" >}} +```python +coalesced_time = ctx.timer_service().current_watermark() + 1 +ctx.timer_service().register_event_time_timer(coalesced_time) +``` +{{< /tab >}} {{< /tabs >}} Timers can also be stopped and removed as follows: @@ -364,6 +471,12 @@ val timestampOfTimerToStop = ... ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop) ``` {{< /tab >}} +{{< tab "Python" >}} +```python +timestamp_of_timer_to_stop = ... +ctx.timer_service().delete_processing_time_timer(timestamp_of_timer_to_stop) +``` +{{< /tab >}} {{< /tabs >}} Stopping an event-time timer: @@ -381,6 +494,12 @@ val timestampOfTimerToStop = ... ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop) ``` {{< /tab >}} +{{< tab "Python" >}} +```python +timestamp_of_timer_to_stop = ... +ctx.timer_service().delete_event_time_timer(timestamp_of_timer_to_stop) +``` +{{< /tab >}} {{< /tabs >}} {{< hint info >}} diff --git a/docs/content.zh/docs/dev/python/datastream/operators/process_function.md b/docs/content.zh/docs/dev/python/datastream/operators/process_function.md new file mode 100644 index 0000000..d9f772b --- /dev/null +++ b/docs/content.zh/docs/dev/python/datastream/operators/process_function.md @@ -0,0 +1,52 @@ +--- +title: "Process Function" +weight: 4 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Process Function + +## ProcessFunction + +The `ProcessFunction` is a low-level stream processing operation, giving access to the basic building blocks of +all (acyclic) streaming applications: + +- events (stream elements) +- state (fault-tolerant, consistent, only on keyed stream) +- timers (event time and processing time, only on keyed stream) + +The `ProcessFunction` can be thought of as a `FlatMapFunction` with access to keyed state and timers. It handles events +by being invoked for each event received in the input stream(s). + +Please refer to [Process Function]({{< ref "docs/dev/datastream/operators/process_function" >}}) +for more details about the concept and usage of `ProcessFunction`. + +## Execution behavior of timer + +Python user-defined functions are executed in a separate Python process from Flink's operators which run in a JVM, +the timer registration requests made in `ProcessFunction` will be sent to the Java operator asynchronously. +Once received timer registration requests, the Java operator will register it into the underlying timer service. + +If the registered timer has already passed the current time (the current system time for processing time timer, +or the current watermark for event time), it will be triggered immediately. + +Note that, due to the asynchronous processing characteristics, it may happen that the timer was triggered a little later than the actual time. +For example, a registered processing time timer of `10:00:00` may be actually processed at `10:00:05`. diff --git a/docs/content/docs/dev/datastream/operators/process_function.md b/docs/content/docs/dev/datastream/operators/process_function.md index 90d9ed1..c1335ba 100644 --- a/docs/content/docs/dev/datastream/operators/process_function.md +++ b/docs/content/docs/dev/datastream/operators/process_function.md @@ -246,6 +246,94 @@ class CountWithTimeoutFunction extends KeyedProcessFunction[Tuple, (String, Stri } ``` {{< /tab >}} +{{< tab "Python" >}} +```python +import datetime + +from pyflink.common import Row, WatermarkStrategy +from pyflink.common.typeinfo import Types +from pyflink.common.watermark_strategy import TimestampAssigner +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext +from pyflink.datastream.state import ValueStateDescriptor +from pyflink.table import StreamTableEnvironment + + +class CountWithTimeoutFunction(KeyedProcessFunction): + + def __init__(self): + self.state = None + + def open(self, runtime_context: RuntimeContext): + self.state = runtime_context.get_state(ValueStateDescriptor( + "my_state", Types.ROW([Types.STRING(), Types.LONG(), Types.LONG()]))) + + def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): + # retrieve the current count + current = self.state.value() + if current is None: + current = Row(value.f1, 0, 0) + + # update the state's count + current[1] += 1 + + # set the state's timestamp to the record's assigned event time timestamp + current[2] = ctx.timestamp() + + # write the state back + self.state.update(current) + + # schedule the next timer 60 seconds from the current event time + ctx.timer_service().register_event_time_timer(current[2] + 60000) + + def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'): + # get the state for the key that scheduled the timer + result = self.state.value() + + # check if this is an outdated timer or the latest timer + if timestamp == result[2] + 60000: + # emit the state on timeout + yield result[0], result[1] + + +class MyTimestampAssigner(TimestampAssigner): + + def __init__(self): + self.epoch = datetime.datetime.utcfromtimestamp(0) + + def extract_timestamp(self, value, record_timestamp) -> int: + return int((value[0] - self.epoch).total_seconds() * 1000) + + +if __name__ == '__main__': + env = StreamExecutionEnvironment.get_execution_environment() + t_env = StreamTableEnvironment.create(stream_execution_environment=env) + + t_env.execute_sql(""" + CREATE TABLE my_source ( + a TIMESTAMP(3), + b VARCHAR, + c VARCHAR + ) WITH ( + 'connector' = 'datagen', + 'rows-per-second' = '10' + ) + """) + + stream = t_env.to_append_stream( + t_env.from_path('my_source'), + Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()])) + watermarked_stream = stream.assign_timestamps_and_watermarks( + WatermarkStrategy.for_monotonous_timestamps() + .with_timestamp_assigner(MyTimestampAssigner())) + + # apply the process function onto a keyed stream + result = watermarked_stream.key_by(lambda value: value[1]) \ + .process(CountWithTimeoutFunction()) \ + .print() + env.execute() +``` +{{< /tab >}} {{< /tabs >}} @@ -281,6 +369,13 @@ override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]): } ``` {{< /tab >}} +{{< tab "Python" >}} +```python +def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'): + key = ctx.get_current_key() + # ... +``` +{{< /tab >}} {{< /tabs >}} ## Timers @@ -327,6 +422,12 @@ val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000 ctx.timerService.registerProcessingTimeTimer(coalescedTime) ``` {{< /tab >}} +{{< tab "Python" >}} +```python +coalesced_time = ((ctx.timestamp() + timeout) // 1000) * 1000 +ctx.timer_service().register_processing_time_timer(coalesced_time) +``` +{{< /tab >}} {{< /tabs >}} Since event-time timers only fire with watermarks coming in, you may also schedule and coalesce @@ -345,6 +446,12 @@ val coalescedTime = ctx.timerService.currentWatermark + 1 ctx.timerService.registerEventTimeTimer(coalescedTime) ``` {{< /tab >}} +{{< tab "Python" >}} +```python +coalesced_time = ctx.timer_service().current_watermark() + 1 +ctx.timer_service().register_event_time_timer(coalesced_time) +``` +{{< /tab >}} {{< /tabs >}} Timers can also be stopped and removed as follows: @@ -364,6 +471,12 @@ val timestampOfTimerToStop = ... ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop) ``` {{< /tab >}} +{{< tab "Python" >}} +```python +timestamp_of_timer_to_stop = ... +ctx.timer_service().delete_processing_time_timer(timestamp_of_timer_to_stop) +``` +{{< /tab >}} {{< /tabs >}} Stopping an event-time timer: @@ -381,6 +494,12 @@ val timestampOfTimerToStop = ... ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop) ``` {{< /tab >}} +{{< tab "Python" >}} +```python +timestamp_of_timer_to_stop = ... +ctx.timer_service().delete_event_time_timer(timestamp_of_timer_to_stop) +``` +{{< /tab >}} {{< /tabs >}} {{< hint info >}} diff --git a/docs/content/docs/dev/python/datastream/operators/process_function.md b/docs/content/docs/dev/python/datastream/operators/process_function.md new file mode 100644 index 0000000..d9f772b --- /dev/null +++ b/docs/content/docs/dev/python/datastream/operators/process_function.md @@ -0,0 +1,52 @@ +--- +title: "Process Function" +weight: 4 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Process Function + +## ProcessFunction + +The `ProcessFunction` is a low-level stream processing operation, giving access to the basic building blocks of +all (acyclic) streaming applications: + +- events (stream elements) +- state (fault-tolerant, consistent, only on keyed stream) +- timers (event time and processing time, only on keyed stream) + +The `ProcessFunction` can be thought of as a `FlatMapFunction` with access to keyed state and timers. It handles events +by being invoked for each event received in the input stream(s). + +Please refer to [Process Function]({{< ref "docs/dev/datastream/operators/process_function" >}}) +for more details about the concept and usage of `ProcessFunction`. + +## Execution behavior of timer + +Python user-defined functions are executed in a separate Python process from Flink's operators which run in a JVM, +the timer registration requests made in `ProcessFunction` will be sent to the Java operator asynchronously. +Once received timer registration requests, the Java operator will register it into the underlying timer service. + +If the registered timer has already passed the current time (the current system time for processing time timer, +or the current watermark for event time), it will be triggered immediately. + +Note that, due to the asynchronous processing characteristics, it may happen that the timer was triggered a little later than the actual time. +For example, a registered processing time timer of `10:00:00` may be actually processed at `10:00:05`.