This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b34548a1b68 [FLINK-38591][python] Add Python AsyncFunction example 
(#27198)
b34548a1b68 is described below

commit b34548a1b686cea27d51fc1353b69e1f5b80c593
Author: Dian Fu <[email protected]>
AuthorDate: Thu Nov 6 10:30:16 2025 +0800

    [FLINK-38591][python] Add Python AsyncFunction example (#27198)
---
 .../datastream/asyncio/remote_model_inference.py   | 150 +++++++++++++++++++++
 .../datastream/process/async_function/operation.py |   1 +
 2 files changed, 151 insertions(+)

diff --git 
a/flink-python/pyflink/examples/datastream/asyncio/remote_model_inference.py 
b/flink-python/pyflink/examples/datastream/asyncio/remote_model_inference.py
new file mode 100644
index 00000000000..9eaff42e8b1
--- /dev/null
+++ b/flink-python/pyflink/examples/datastream/asyncio/remote_model_inference.py
@@ -0,0 +1,150 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import argparse
+import asyncio
+import functools
+import json
+import logging
+import random
+import sys
+from typing import List
+
+from pyflink.common import Encoder, Types, Time, Row
+from pyflink.datastream import StreamExecutionEnvironment, AsyncDataStream, 
AsyncFunction, \
+    RuntimeContext, AsyncRetryStrategy, async_retry_predicates, 
CheckpointingMode
+from pyflink.datastream.connectors.file_system import (FileSink, 
OutputFileConfig, RollingPolicy)
+from pyflink.table import StreamTableEnvironment, TableDescriptor, Schema, 
DataTypes
+
+
+class AsyncLLMRequest(AsyncFunction[Row, str]):
+
+    def __init__(self):
+        self.retried_keys = {}
+
+    def open(self, runtime_context: RuntimeContext):
+        # create model inference client here
+        pass
+
+    def close(self):
+        # close the model inference client here
+        pass
+
+    async def async_invoke(self, value: Row) -> List[str]:
+        # issue the asynchronous request
+        await asyncio.sleep(random.randint(1, 2))
+
+        if value.user_id not in self.retried_keys and random.randint(1, 10) % 
3 == 0:
+            self.retried_keys[value.user_id] = True
+            # remote model inference request may time out
+            raise TimeoutError
+        else:
+            if value.user_id in self.retried_keys:
+                del self.retried_keys[value.user_id]
+            # remote model inference request completes
+            # note that the result should be a collection even there is only 
one result
+            analysis_result = "positive"
+            result = {
+                "user_id": value.user_id,
+                "comments": value.comments,
+                "analysis_result": analysis_result
+            }
+            return [json.dumps(result)]
+
+    def timeout(self, value: Row) -> List[str]:
+        # return a default value in case timeout
+        result = {
+            "user_id": value.user_id,
+            "comments": value.comments,
+            "analysis_result": None
+        }
+        return [json.dumps(result)]
+
+
+def main(output_path):
+    env = StreamExecutionEnvironment.get_execution_environment()
+    env.enable_checkpointing(30000, CheckpointingMode.EXACTLY_ONCE)
+    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
+
+    # source: user_id, comments
+    t_env.create_temporary_table(
+        'source',
+        TableDescriptor.for_connector('datagen')
+                       .schema(Schema.new_builder()
+                               .column('user_id', DataTypes.INT())
+                               .column('comments', DataTypes.STRING())
+                               .build())
+                       .option('fields.user_id.kind', 'random')
+                       .option('fields.comments.kind', 'random')
+                       .option('rows-per-second', '100')
+                       .build())
+
+    table = t_env.from_path('source')
+    ds = t_env.to_data_stream(table)
+
+    # create an async retry strategy via utility class or a user defined 
strategy
+    async_retry_strategy = AsyncRetryStrategy.fixed_delay(
+        max_attempts=100,
+        backoff_time_millis=1000,
+        result_predicate=None,
+        
exception_predicate=functools.partial(async_retry_predicates.exception_type_predicate,
+                                              
expected_error_type=TimeoutError))
+
+    result_stream = AsyncDataStream.unordered_wait_with_retry(
+        data_stream=ds,
+        async_function=AsyncLLMRequest(),
+        timeout=Time.seconds(10),
+        async_retry_strategy=async_retry_strategy,
+        capacity=1000,
+        output_type=Types.STRING())
+
+    # define the sink
+    if output_path is not None:
+        result_stream.sink_to(
+            sink=FileSink.for_row_format(
+                base_path=output_path,
+                encoder=Encoder.simple_string_encoder())
+            .with_output_file_config(
+                OutputFileConfig.builder()
+                .with_part_prefix("prefix")
+                .with_part_suffix(".ext")
+                .build())
+            .with_rolling_policy(RollingPolicy.default_rolling_policy())
+            .build()
+        )
+    else:
+        print("Printing result to stdout. Use --output to specify output 
path.")
+        result_stream.print()
+
+    # submit for execution
+    env.execute()
+
+
+if __name__ == '__main__':
+    logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
format="%(message)s")
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument(
+        '--output',
+        dest='output',
+        required=False,
+        help='Output file to write results to.')
+
+    argv = sys.argv[1:]
+    known_args, _ = parser.parse_known_args(argv)
+
+    main(known_args.output)
diff --git 
a/flink-python/pyflink/fn_execution/datastream/process/async_function/operation.py
 
b/flink-python/pyflink/fn_execution/datastream/process/async_function/operation.py
index 6926ac86bc6..8be7b21fa8a 100644
--- 
a/flink-python/pyflink/fn_execution/datastream/process/async_function/operation.py
+++ 
b/flink-python/pyflink/fn_execution/datastream/process/async_function/operation.py
@@ -153,6 +153,7 @@ class RetryableResultHandler(ResultFuture, Generic[IN, 
OUT]):
             retry_strategy.get_retry_predicate().exception_predicate() or 
(lambda _: False)
         self._retry_awaiting = AtomicBoolean(False)
         self._current_attempts = 1
+        self._delayed_retry_timer = None
 
     def register_timeout(self, timeout):
         timer = threading.Timer(timeout, self._timer_triggered)

Reply via email to