This is an automated email from the ASF dual-hosted git repository.
sxnan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new afa7a6f7 [hotfix] Handle durable execute exception properly (#465)
afa7a6f7 is described below
commit afa7a6f7e358cb77d852399933fc64df2c76a6ff
Author: Xuannan <[email protected]>
AuthorDate: Mon Jan 26 14:50:58 2026 +0800
[hotfix] Handle durable execute exception properly (#465)
---
.../e2e_tests_integration/execute_test.py | 53 ++++++++++++++++++++++
.../e2e_tests_integration/execute_test_agent.py | 40 ++++++++++++++++
.../ground_truth/test_execute_async_exception.txt | 3 ++
.../flink_agents/runtime/flink_runner_context.py | 13 ++++--
4 files changed, 106 insertions(+), 3 deletions(-)
diff --git
a/python/flink_agents/e2e_tests/e2e_tests_integration/execute_test.py
b/python/flink_agents/e2e_tests/e2e_tests_integration/execute_test.py
index 8118f15b..fec68dad 100644
--- a/python/flink_agents/e2e_tests/e2e_tests_integration/execute_test.py
+++ b/python/flink_agents/e2e_tests/e2e_tests_integration/execute_test.py
@@ -39,6 +39,7 @@ from
flink_agents.e2e_tests.e2e_tests_integration.execute_test_agent import (
ExecuteTestAgent,
ExecuteTestData,
ExecuteTestKeySelector,
+ ExecuteWithAsyncExceptionTestAgent,
ExecuteWithAsyncTestAgent,
)
from flink_agents.e2e_tests.test_utils import check_result
@@ -203,3 +204,55 @@ def test_durable_execute_with_async_flink(tmp_path: Path)
-> None:
),
)
+
+def test_durable_execute_async_exception_flink(tmp_path: Path) -> None:
+ """Test durable_execute_async() exception handling in Flink environment."""
+ config = Configuration()
+ config.set_string("state.backend.type", "rocksdb")
+ config.set_string("checkpointing.interval", "1s")
+ config.set_string("restart-strategy.type", "disable")
+ env = StreamExecutionEnvironment.get_execution_environment(config)
+ env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
+ env.set_parallelism(1)
+
+ input_datastream = env.from_source(
+ source=FileSource.for_record_stream_format(
+ StreamFormat.text_line_format(),
+ f"file:///{current_dir}/../resources/execute_test_input",
+ ).build(),
+ watermark_strategy=WatermarkStrategy.no_watermarks(),
+ source_name="execute_test_source",
+ )
+
+ deserialize_datastream = input_datastream.map(
+ lambda x: ExecuteTestData.model_validate_json(x)
+ )
+
+ agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env)
+ output_datastream = (
+ agents_env.from_datastream(
+ input=deserialize_datastream, key_selector=ExecuteTestKeySelector()
+ )
+ .apply(ExecuteWithAsyncExceptionTestAgent())
+ .to_datastream()
+ )
+
+ result_dir = tmp_path / "results"
+ result_dir.mkdir(parents=True, exist_ok=True)
+
+ output_datastream.map(lambda x: x.model_dump_json(),
Types.STRING()).add_sink(
+ StreamingFileSink.for_row_format(
+ base_path=str(result_dir.absolute()),
+ encoder=Encoder.simple_string_encoder(),
+ ).build()
+ )
+
+ agents_env.execute()
+
+ check_result(
+ result_dir=result_dir,
+ groud_truth_dir=Path(
+
f"{current_dir}/../resources/ground_truth/test_execute_async_exception.txt"
+ ),
+ )
+
diff --git
a/python/flink_agents/e2e_tests/e2e_tests_integration/execute_test_agent.py
b/python/flink_agents/e2e_tests/e2e_tests_integration/execute_test_agent.py
index a0b39477..c9a675d4 100644
--- a/python/flink_agents/e2e_tests/e2e_tests_integration/execute_test_agent.py
+++ b/python/flink_agents/e2e_tests/e2e_tests_integration/execute_test_agent.py
@@ -56,6 +56,21 @@ class ExecuteTestOutput(BaseModel):
result: int
+class ExecuteTestErrorOutput(BaseModel):
+ """Output data model for durable execute exception test.
+
+ Attributes:
+ ----------
+ id : int
+ Unique identifier of the item
+ error : str
+ The error message
+ """
+
+ id: int
+ error: str
+
+
class ExecuteTestKeySelector(KeySelector):
"""KeySelector for extracting key from ExecuteTestData."""
@@ -74,6 +89,11 @@ def multiply_value(x: int, y: int) -> int:
return x * y
+def raise_exception(message: str) -> None:
+ """A function that raises a ValueError for testing."""
+ raise ValueError(message)
+
+
class ExecuteTestAgent(Agent):
"""Agent that uses synchronous durable_execute() method for testing."""
@@ -116,3 +136,23 @@ class ExecuteWithAsyncTestAgent(Agent):
OutputEvent(output=ExecuteTestOutput(id=input_data.id,
result=async_result))
)
+
+class ExecuteWithAsyncExceptionTestAgent(Agent):
+ """Agent that tests exception handling in durable_execute_async()."""
+
+ @action(InputEvent)
+ @staticmethod
+ async def process(event: Event, ctx: RunnerContext) -> None:
+ """Process an event and capture durable_execute_async() exceptions."""
+ input_data: ExecuteTestData = event.input
+ try:
+ await ctx.durable_execute_async(
+ raise_exception, f"Test error: {input_data.value}"
+ )
+ except ValueError as exc:
+ ctx.send_event(
+ OutputEvent(
+ output=ExecuteTestErrorOutput(id=input_data.id,
error=str(exc))
+ )
+ )
+
diff --git
a/python/flink_agents/e2e_tests/resources/ground_truth/test_execute_async_exception.txt
b/python/flink_agents/e2e_tests/resources/ground_truth/test_execute_async_exception.txt
new file mode 100644
index 00000000..6b5e2dd1
--- /dev/null
+++
b/python/flink_agents/e2e_tests/resources/ground_truth/test_execute_async_exception.txt
@@ -0,0 +1,3 @@
+{"id":1,"error":"Test error: 5"}
+{"id":2,"error":"Test error: 15"}
+{"id":3,"error":"Test error: 10"}
diff --git a/python/flink_agents/runtime/flink_runner_context.py
b/python/flink_agents/runtime/flink_runner_context.py
index 0e8664a0..3592d21e 100644
--- a/python/flink_agents/runtime/flink_runner_context.py
+++ b/python/flink_agents/runtime/flink_runner_context.py
@@ -135,14 +135,21 @@ class _DurableAsyncExecutionResult(AsyncExecutionResult):
future = self._executor.submit(self._func, *self._args, **self._kwargs)
while not future.done():
yield
-
- result = future.result()
+ try:
+ result = future.result()
+ except _DurableExecutionException as exc:
+ # Record and re-raise the original exception for better
diagnostics.
+ exc.record_and_raise()
# Handle the wrapped result/exception
if isinstance(result, _DurableExecutionResult):
return result.get_result()
elif isinstance(result, _DurableExecutionException):
- result.record_and_raise()
+ error_message = (
+ "Unexpected _DurableExecutionException returned from executor;
"
+ "it should have been raised via future.result()."
+ )
+ raise TypeError(error_message) from result.original_exception
else:
return result