This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new bc84294b979 [FLINK-38560][python] Support ordered mode for async
function in Python DataStream API
bc84294b979 is described below
commit bc84294b979d4c32b33a3eedf845d3914a58e34f
Author: Dian Fu <[email protected]>
AuthorDate: Wed Oct 29 13:34:43 2025 +0800
[FLINK-38560][python] Support ordered mode for async function in Python
DataStream API
This closes #27170.
---
.../pyflink/datastream/async_data_stream.py | 59 +++++++++++++++
.../datastream/tests/test_async_function.py | 26 ++++++-
.../datastream/process/async_function/operation.py | 4 +-
.../datastream/process/async_function/queue.py | 85 ++++++++++++++++++++--
4 files changed, 166 insertions(+), 8 deletions(-)
diff --git a/flink-python/pyflink/datastream/async_data_stream.py
b/flink-python/pyflink/datastream/async_data_stream.py
index 0e14533c4d8..1fd3ff1d310 100644
--- a/flink-python/pyflink/datastream/async_data_stream.py
+++ b/flink-python/pyflink/datastream/async_data_stream.py
@@ -89,6 +89,65 @@ class AsyncDataStream(object):
j_output_type_info,
j_python_data_stream_function_operator))
+ @staticmethod
+ def ordered_wait(
+ data_stream: DataStream,
+ async_function: AsyncFunction,
+ timeout: Time,
+ capacity: int = 100,
+ output_type: TypeInformation = None) -> 'DataStream':
+ """
+ Adds an async function to the data stream. The order to process input
records
+ is guaranteed to be the same as input ones.
+
+ :param data_stream: The input data stream.
+ :param async_function: The async function.
+ :param timeout: The timeout for the asynchronous operation to complete.
+ :param capacity: The max number of async i/o operation that can be
triggered.
+ :param output_type: The output data type.
+ :return: The transformed DataStream.
+ """
+ return AsyncDataStream.ordered_wait_with_retry(
+ data_stream, async_function, timeout,
async_retry_strategies.NO_RETRY_STRATEGY,
+ capacity, output_type)
+
+ @staticmethod
+ def ordered_wait_with_retry(
+ data_stream: DataStream,
+ async_function: AsyncFunction,
+ timeout: Time,
+ async_retry_strategy: AsyncRetryStrategy,
+ capacity: int = 100,
+ output_type: TypeInformation = None) -> 'DataStream':
+ """
+ Adds an async function with an AsyncRetryStrategy to support retry of
AsyncFunction to the
+ data stream. The order to process input records is guaranteed to be
the same as input ones.
+
+ :param data_stream: The input data stream.
+ :param async_function: The async function.
+ :param timeout: The timeout for the asynchronous operation to complete.
+ :param async_retry_strategy: The strategy of reattempt async i/o
operation that can be
+ triggered
+ :param capacity: The max number of async i/o operation that can be
triggered.
+ :param output_type: The output data type.
+ :return: The transformed DataStream.
+ """
+ AsyncDataStream._validate(data_stream, async_function, timeout,
async_retry_strategy)
+
+ from pyflink.fn_execution import flink_fn_execution_pb2
+ j_python_data_stream_function_operator, j_output_type_info = \
+ _get_one_input_stream_operator(
+ data_stream,
+ AsyncFunctionDescriptor(
+ async_function, timeout, capacity, async_retry_strategy,
+ AsyncFunctionDescriptor.OutputMode.ORDERED),
+ flink_fn_execution_pb2.UserDefinedDataStreamFunction.PROCESS,
# type: ignore
+ output_type)
+ return DataStream(data_stream._j_data_stream.transform(
+ "async wait operator",
+ j_output_type_info,
+ j_python_data_stream_function_operator))
+
@staticmethod
def _validate(data_stream: DataStream, async_function: AsyncFunction,
timeout: Time, async_retry_strategy: AsyncRetryStrategy) ->
None:
diff --git a/flink-python/pyflink/datastream/tests/test_async_function.py
b/flink-python/pyflink/datastream/tests/test_async_function.py
index 30fe8d50022..a63478bf359 100644
--- a/flink-python/pyflink/datastream/tests/test_async_function.py
+++ b/flink-python/pyflink/datastream/tests/test_async_function.py
@@ -45,7 +45,7 @@ class AsyncFunctionTests(PyFlinkStreamingTestCase):
def assert_equals(self, expected, actual):
self.assertEqual(expected, actual)
- def test_basic_functionality(self):
+ def test_unordered_mode(self):
self.env.set_parallelism(1)
ds = self.env.from_collection(
[(1, 1), (2, 2), (3, 3), (4, 4), (5, 5)],
@@ -69,6 +69,30 @@ class AsyncFunctionTests(PyFlinkStreamingTestCase):
expected = ['2', '4', '6', '8', '10']
self.assert_equals_sorted(expected, results)
+ def test_ordered_mode(self):
+ self.env.set_parallelism(1)
+ ds = self.env.from_collection(
+ [(1, 1), (2, 2), (3, 3), (4, 4), (5, 5)],
+ type_info=Types.ROW_NAMED(["v1", "v2"], [Types.INT(), Types.INT()])
+ )
+
+ class MyAsyncFunction(AsyncFunction):
+
+ async def async_invoke(self, value: Row, result_future:
ResultFuture[int]):
+ await asyncio.sleep(random.randint(1, 2))
+ result_future.complete([value[0] + value[1]])
+
+ def timeout(self, value: Row, result_future: ResultFuture[int]):
+ result_future.complete([value[0] + value[1]])
+
+ ds = AsyncDataStream.ordered_wait(
+ ds, MyAsyncFunction(), Time.seconds(5), 2, Types.INT())
+ ds.add_sink(self.test_sink)
+ self.env.execute()
+ results = self.test_sink.get_results(False)
+ expected = ['2', '4', '6', '8', '10']
+ self.assert_equals(expected, results)
+
def test_watermark(self):
self.env.set_parallelism(1)
ds = self.env.from_collection(
diff --git
a/flink-python/pyflink/fn_execution/datastream/process/async_function/operation.py
b/flink-python/pyflink/fn_execution/datastream/process/async_function/operation.py
index 467ebc05ded..1f30e2ba303 100644
---
a/flink-python/pyflink/fn_execution/datastream/process/async_function/operation.py
+++
b/flink-python/pyflink/fn_execution/datastream/process/async_function/operation.py
@@ -24,7 +24,7 @@ from typing import TypeVar, Generic, List, Iterable,
Callable, Optional
from pyflink.datastream import RuntimeContext, ResultFuture
from pyflink.datastream.functions import AsyncFunctionDescriptor,
AsyncRetryStrategy
from pyflink.fn_execution.datastream.process.async_function.queue import \
- UnorderedStreamElementQueue, StreamElementQueue
+ UnorderedStreamElementQueue, StreamElementQueue, OrderedStreamElementQueue
from pyflink.fn_execution.datastream.process.operations import Operation
from pyflink.fn_execution.datastream.process.runtime_context import
StreamingRuntimeContext
@@ -291,7 +291,7 @@ class AsyncOperation(Operation):
if output_mode == AsyncFunctionDescriptor.OutputMode.UNORDERED:
self._queue = UnorderedStreamElementQueue(capacity,
self._raise_exception_if_exists)
else:
- raise NotImplementedError("ORDERED mode is still not supported.")
+ self._queue = OrderedStreamElementQueue(capacity,
self._raise_exception_if_exists)
self._emitter = None
self._async_function_runner = None
self._exception = None
diff --git
a/flink-python/pyflink/fn_execution/datastream/process/async_function/queue.py
b/flink-python/pyflink/fn_execution/datastream/process/async_function/queue.py
index 08ac20f0681..0f4b46affb7 100644
---
a/flink-python/pyflink/fn_execution/datastream/process/async_function/queue.py
+++
b/flink-python/pyflink/fn_execution/datastream/process/async_function/queue.py
@@ -17,7 +17,7 @@
################################################################################
import collections
import threading
-from abc import ABC
+from abc import ABC, abstractmethod
from typing import Generic, TypeVar, List
from pyflink.datastream import ResultFuture
@@ -34,12 +34,14 @@ class StreamElementQueueEntry(ABC, ResultFuture,
Generic[OUT]):
allows to set the result of a completed entry through ResultFuture.
"""
+ @abstractmethod
def is_done(self) -> bool:
"""
True if the stream element queue entry has been completed; otherwise
false.
"""
pass
+ @abstractmethod
def emit_result(self, output_processor) -> int:
"""
Emits the results associated with this queue entry.
@@ -109,6 +111,7 @@ class WatermarkQueueEntry(StreamElementQueueEntry):
class StreamElementQueue(ABC, Generic[OUT]):
+ @abstractmethod
def put(self, windowed_value, timestamp, watermark, record) ->
ResultFuture[OUT]:
"""
Put the given record in the queue. This operation blocks until the
queue has
@@ -125,6 +128,7 @@ class StreamElementQueue(ABC, Generic[OUT]):
"""
pass
+ @abstractmethod
def advance_watermark(self, watermark):
"""
Tries to put the given watermark in the queue. This operation succeeds
if the queue has
@@ -134,6 +138,7 @@ class StreamElementQueue(ABC, Generic[OUT]):
"""
pass
+ @abstractmethod
def emit_completed_element(self, output_processor):
"""
Emits one completed element from the head of this queue into the given
output.
@@ -142,30 +147,35 @@ class StreamElementQueue(ABC, Generic[OUT]):
"""
pass
+ @abstractmethod
def has_completed_elements(self) -> bool:
"""
Checks if there is at least one completed head element.
"""
pass
+ @abstractmethod
def wait_for_completed_elements(self):
"""
Waits until there is completed elements.
"""
pass
- def wait_for_in_flight_elements_processed(self):
+ @abstractmethod
+ def wait_for_in_flight_elements_processed(self, timeout=1):
"""
Waits until any inflight elements have been processed.
"""
pass
+ @abstractmethod
def is_empty(self) -> bool:
"""
True if the queue is empty; otherwise false.
"""
pass
+ @abstractmethod
def size(self) -> int:
"""
Return the size of the queue.
@@ -265,9 +275,10 @@ class UnorderedStreamElementQueue(StreamElementQueue):
return entry
def advance_watermark(self, watermark):
- with self._lock:
- if watermark > self._current_watermark:
- self._current_watermark = watermark
+ if watermark > self._current_watermark:
+ self._current_watermark = watermark
+
+ with self._lock:
self._add_watermark(watermark)
def emit_completed_element(self, output_processor):
@@ -343,3 +354,67 @@ class UnorderedStreamElementQueue(StreamElementQueue):
new_segment = UnorderedStreamElementQueue.Segment(capacity)
self._segments.append(new_segment)
return new_segment
+
+
+class OrderedStreamElementQueue(StreamElementQueue):
+
+ def __init__(self, capacity: int, exception_checker):
+ self._capacity = capacity
+ self._exception_checker = exception_checker
+ self._queue = collections.deque()
+ self._lock = threading.RLock()
+ self._not_full = threading.Condition(self._lock)
+ self._not_empty = threading.Condition(self._lock)
+ self._number_of_pending_entries = 0
+
+ def put(self, windowed_value, timestamp, watermark, record) ->
ResultFuture[OUT]:
+ with self._not_full:
+ while self.size() >= self._capacity:
+ self._not_full.wait(1)
+ self._exception_checker()
+
+ entry = StreamRecordQueueEntry(windowed_value, timestamp,
watermark, record)
+ entry.on_complete(self.on_complete_handler)
+ self._queue.append(entry)
+ self._number_of_pending_entries += 1
+ return entry
+
+ def advance_watermark(self, watermark):
+ # do nothing in ordered mode
+ pass
+
+ def emit_completed_element(self, output_processor):
+ with self._not_full:
+ if not self.has_completed_elements():
+ return
+
+ self._queue.popleft().emit_result(output_processor)
+ self._number_of_pending_entries -= 1
+ self._not_full.notify_all()
+
+ def has_completed_elements(self) -> bool:
+ with self._lock:
+ return len(self._queue) > 0 and self._queue[0].is_done()
+
+ def wait_for_completed_elements(self):
+ with self._not_empty:
+ while not self.has_completed_elements():
+ self._not_empty.wait()
+
+ def wait_for_in_flight_elements_processed(self, timeout=1):
+ with self._not_full:
+ if self._number_of_pending_entries != 0:
+ self._not_full.wait(timeout)
+
+ def is_empty(self) -> bool:
+ with self._lock:
+ return self._number_of_pending_entries == 0
+
+ def size(self) -> int:
+ with self._lock:
+ return self._number_of_pending_entries
+
+ def on_complete_handler(self, entry):
+ with self._not_empty:
+ if self.has_completed_elements():
+ self._not_empty.notify()