This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bc84294b979 [FLINK-38560][python] Support ordered mode for async 
function in Python DataStream API
bc84294b979 is described below

commit bc84294b979d4c32b33a3eedf845d3914a58e34f
Author: Dian Fu <[email protected]>
AuthorDate: Wed Oct 29 13:34:43 2025 +0800

    [FLINK-38560][python] Support ordered mode for async function in Python 
DataStream API
    
    This closes #27170.
---
 .../pyflink/datastream/async_data_stream.py        | 59 +++++++++++++++
 .../datastream/tests/test_async_function.py        | 26 ++++++-
 .../datastream/process/async_function/operation.py |  4 +-
 .../datastream/process/async_function/queue.py     | 85 ++++++++++++++++++++--
 4 files changed, 166 insertions(+), 8 deletions(-)

diff --git a/flink-python/pyflink/datastream/async_data_stream.py 
b/flink-python/pyflink/datastream/async_data_stream.py
index 0e14533c4d8..1fd3ff1d310 100644
--- a/flink-python/pyflink/datastream/async_data_stream.py
+++ b/flink-python/pyflink/datastream/async_data_stream.py
@@ -89,6 +89,65 @@ class AsyncDataStream(object):
             j_output_type_info,
             j_python_data_stream_function_operator))
 
+    @staticmethod
+    def ordered_wait(
+            data_stream: DataStream,
+            async_function: AsyncFunction,
+            timeout: Time,
+            capacity: int = 100,
+            output_type: TypeInformation = None) -> 'DataStream':
+        """
+        Adds an async function to the data stream. The order to process input 
records
+        is guaranteed to be the same as input ones.
+
+        :param data_stream: The input data stream.
+        :param async_function: The async function.
+        :param timeout: The timeout for the asynchronous operation to complete.
+        :param capacity: The max number of async i/o operation that can be 
triggered.
+        :param output_type: The output data type.
+        :return: The transformed DataStream.
+        """
+        return AsyncDataStream.ordered_wait_with_retry(
+            data_stream, async_function, timeout, 
async_retry_strategies.NO_RETRY_STRATEGY,
+            capacity, output_type)
+
+    @staticmethod
+    def ordered_wait_with_retry(
+            data_stream: DataStream,
+            async_function: AsyncFunction,
+            timeout: Time,
+            async_retry_strategy: AsyncRetryStrategy,
+            capacity: int = 100,
+            output_type: TypeInformation = None) -> 'DataStream':
+        """
+        Adds an async function with an AsyncRetryStrategy to support retry of 
AsyncFunction to the
+        data stream. The order to process input records is guaranteed to be 
the same as input ones.
+
+        :param data_stream: The input data stream.
+        :param async_function: The async function.
+        :param timeout: The timeout for the asynchronous operation to complete.
+        :param async_retry_strategy: The strategy of reattempt async i/o 
operation that can be
+                                     triggered
+        :param capacity: The max number of async i/o operation that can be 
triggered.
+        :param output_type: The output data type.
+        :return: The transformed DataStream.
+        """
+        AsyncDataStream._validate(data_stream, async_function, timeout, 
async_retry_strategy)
+
+        from pyflink.fn_execution import flink_fn_execution_pb2
+        j_python_data_stream_function_operator, j_output_type_info = \
+            _get_one_input_stream_operator(
+                data_stream,
+                AsyncFunctionDescriptor(
+                    async_function, timeout, capacity, async_retry_strategy,
+                    AsyncFunctionDescriptor.OutputMode.ORDERED),
+                flink_fn_execution_pb2.UserDefinedDataStreamFunction.PROCESS,  
# type: ignore
+                output_type)
+        return DataStream(data_stream._j_data_stream.transform(
+            "async wait operator",
+            j_output_type_info,
+            j_python_data_stream_function_operator))
+
     @staticmethod
     def _validate(data_stream: DataStream, async_function: AsyncFunction,
                   timeout: Time, async_retry_strategy: AsyncRetryStrategy) -> 
None:
diff --git a/flink-python/pyflink/datastream/tests/test_async_function.py 
b/flink-python/pyflink/datastream/tests/test_async_function.py
index 30fe8d50022..a63478bf359 100644
--- a/flink-python/pyflink/datastream/tests/test_async_function.py
+++ b/flink-python/pyflink/datastream/tests/test_async_function.py
@@ -45,7 +45,7 @@ class AsyncFunctionTests(PyFlinkStreamingTestCase):
     def assert_equals(self, expected, actual):
         self.assertEqual(expected, actual)
 
-    def test_basic_functionality(self):
+    def test_unordered_mode(self):
         self.env.set_parallelism(1)
         ds = self.env.from_collection(
             [(1, 1), (2, 2), (3, 3), (4, 4), (5, 5)],
@@ -69,6 +69,30 @@ class AsyncFunctionTests(PyFlinkStreamingTestCase):
         expected = ['2', '4', '6', '8', '10']
         self.assert_equals_sorted(expected, results)
 
+    def test_ordered_mode(self):
+        self.env.set_parallelism(1)
+        ds = self.env.from_collection(
+            [(1, 1), (2, 2), (3, 3), (4, 4), (5, 5)],
+            type_info=Types.ROW_NAMED(["v1", "v2"], [Types.INT(), Types.INT()])
+        )
+
+        class MyAsyncFunction(AsyncFunction):
+
+            async def async_invoke(self, value: Row, result_future: 
ResultFuture[int]):
+                await asyncio.sleep(random.randint(1, 2))
+                result_future.complete([value[0] + value[1]])
+
+            def timeout(self, value: Row, result_future: ResultFuture[int]):
+                result_future.complete([value[0] + value[1]])
+
+        ds = AsyncDataStream.ordered_wait(
+            ds, MyAsyncFunction(), Time.seconds(5), 2, Types.INT())
+        ds.add_sink(self.test_sink)
+        self.env.execute()
+        results = self.test_sink.get_results(False)
+        expected = ['2', '4', '6', '8', '10']
+        self.assert_equals(expected, results)
+
     def test_watermark(self):
         self.env.set_parallelism(1)
         ds = self.env.from_collection(
diff --git 
a/flink-python/pyflink/fn_execution/datastream/process/async_function/operation.py
 
b/flink-python/pyflink/fn_execution/datastream/process/async_function/operation.py
index 467ebc05ded..1f30e2ba303 100644
--- 
a/flink-python/pyflink/fn_execution/datastream/process/async_function/operation.py
+++ 
b/flink-python/pyflink/fn_execution/datastream/process/async_function/operation.py
@@ -24,7 +24,7 @@ from typing import TypeVar, Generic, List, Iterable, 
Callable, Optional
 from pyflink.datastream import RuntimeContext, ResultFuture
 from pyflink.datastream.functions import AsyncFunctionDescriptor, 
AsyncRetryStrategy
 from pyflink.fn_execution.datastream.process.async_function.queue import \
-    UnorderedStreamElementQueue, StreamElementQueue
+    UnorderedStreamElementQueue, StreamElementQueue, OrderedStreamElementQueue
 from pyflink.fn_execution.datastream.process.operations import Operation
 from pyflink.fn_execution.datastream.process.runtime_context import 
StreamingRuntimeContext
 
@@ -291,7 +291,7 @@ class AsyncOperation(Operation):
         if output_mode == AsyncFunctionDescriptor.OutputMode.UNORDERED:
             self._queue = UnorderedStreamElementQueue(capacity, 
self._raise_exception_if_exists)
         else:
-            raise NotImplementedError("ORDERED mode is still not supported.")
+            self._queue = OrderedStreamElementQueue(capacity, 
self._raise_exception_if_exists)
         self._emitter = None
         self._async_function_runner = None
         self._exception = None
diff --git 
a/flink-python/pyflink/fn_execution/datastream/process/async_function/queue.py 
b/flink-python/pyflink/fn_execution/datastream/process/async_function/queue.py
index 08ac20f0681..0f4b46affb7 100644
--- 
a/flink-python/pyflink/fn_execution/datastream/process/async_function/queue.py
+++ 
b/flink-python/pyflink/fn_execution/datastream/process/async_function/queue.py
@@ -17,7 +17,7 @@
 
################################################################################
 import collections
 import threading
-from abc import ABC
+from abc import ABC, abstractmethod
 from typing import Generic, TypeVar, List
 
 from pyflink.datastream import ResultFuture
@@ -34,12 +34,14 @@ class StreamElementQueueEntry(ABC, ResultFuture, 
Generic[OUT]):
     allows to set the result of a completed entry through ResultFuture.
     """
 
+    @abstractmethod
     def is_done(self) -> bool:
         """
         True if the stream element queue entry has been completed; otherwise 
false.
         """
         pass
 
+    @abstractmethod
     def emit_result(self, output_processor) -> int:
         """
         Emits the results associated with this queue entry.
@@ -109,6 +111,7 @@ class WatermarkQueueEntry(StreamElementQueueEntry):
 
 class StreamElementQueue(ABC, Generic[OUT]):
 
+    @abstractmethod
     def put(self, windowed_value, timestamp, watermark, record) -> 
ResultFuture[OUT]:
         """
         Put the given record in the queue. This operation blocks until the 
queue has
@@ -125,6 +128,7 @@ class StreamElementQueue(ABC, Generic[OUT]):
         """
         pass
 
+    @abstractmethod
     def advance_watermark(self, watermark):
         """
         Tries to put the given watermark in the queue. This operation succeeds 
if the queue has
@@ -134,6 +138,7 @@ class StreamElementQueue(ABC, Generic[OUT]):
         """
         pass
 
+    @abstractmethod
     def emit_completed_element(self, output_processor):
         """
         Emits one completed element from the head of this queue into the given 
output.
@@ -142,30 +147,35 @@ class StreamElementQueue(ABC, Generic[OUT]):
         """
         pass
 
+    @abstractmethod
     def has_completed_elements(self) -> bool:
         """
         Checks if there is at least one completed head element.
         """
         pass
 
+    @abstractmethod
     def wait_for_completed_elements(self):
         """
         Waits until there is completed elements.
         """
         pass
 
-    def wait_for_in_flight_elements_processed(self):
+    @abstractmethod
+    def wait_for_in_flight_elements_processed(self, timeout=1):
         """
         Waits until any inflight elements have been processed.
         """
         pass
 
+    @abstractmethod
     def is_empty(self) -> bool:
         """
         True if the queue is empty; otherwise false.
         """
         pass
 
+    @abstractmethod
     def size(self) -> int:
         """
         Return the size of the queue.
@@ -265,9 +275,10 @@ class UnorderedStreamElementQueue(StreamElementQueue):
             return entry
 
     def advance_watermark(self, watermark):
-        with self._lock:
-            if watermark > self._current_watermark:
-                self._current_watermark = watermark
+        if watermark > self._current_watermark:
+            self._current_watermark = watermark
+
+            with self._lock:
                 self._add_watermark(watermark)
 
     def emit_completed_element(self, output_processor):
@@ -343,3 +354,67 @@ class UnorderedStreamElementQueue(StreamElementQueue):
         new_segment = UnorderedStreamElementQueue.Segment(capacity)
         self._segments.append(new_segment)
         return new_segment
+
+
+class OrderedStreamElementQueue(StreamElementQueue):
+
+    def __init__(self, capacity: int, exception_checker):
+        self._capacity = capacity
+        self._exception_checker = exception_checker
+        self._queue = collections.deque()
+        self._lock = threading.RLock()
+        self._not_full = threading.Condition(self._lock)
+        self._not_empty = threading.Condition(self._lock)
+        self._number_of_pending_entries = 0
+
+    def put(self, windowed_value, timestamp, watermark, record) -> 
ResultFuture[OUT]:
+        with self._not_full:
+            while self.size() >= self._capacity:
+                self._not_full.wait(1)
+                self._exception_checker()
+
+            entry = StreamRecordQueueEntry(windowed_value, timestamp, 
watermark, record)
+            entry.on_complete(self.on_complete_handler)
+            self._queue.append(entry)
+            self._number_of_pending_entries += 1
+            return entry
+
+    def advance_watermark(self, watermark):
+        # do nothing in ordered mode
+        pass
+
+    def emit_completed_element(self, output_processor):
+        with self._not_full:
+            if not self.has_completed_elements():
+                return
+
+            self._queue.popleft().emit_result(output_processor)
+            self._number_of_pending_entries -= 1
+            self._not_full.notify_all()
+
+    def has_completed_elements(self) -> bool:
+        with self._lock:
+            return len(self._queue) > 0 and self._queue[0].is_done()
+
+    def wait_for_completed_elements(self):
+        with self._not_empty:
+            while not self.has_completed_elements():
+                self._not_empty.wait()
+
+    def wait_for_in_flight_elements_processed(self, timeout=1):
+        with self._not_full:
+            if self._number_of_pending_entries != 0:
+                self._not_full.wait(timeout)
+
+    def is_empty(self) -> bool:
+        with self._lock:
+            return self._number_of_pending_entries == 0
+
+    def size(self) -> int:
+        with self._lock:
+            return self._number_of_pending_entries
+
+    def on_complete_handler(self, entry):
+        with self._not_empty:
+            if self.has_completed_elements():
+                self._not_empty.notify()

Reply via email to