This is an automated email from the ASF dual-hosted git repository.

sxnan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new afa7a6f7 [hotfix] Handle durable execute exception properly (#465)
afa7a6f7 is described below

commit afa7a6f7e358cb77d852399933fc64df2c76a6ff
Author: Xuannan <[email protected]>
AuthorDate: Mon Jan 26 14:50:58 2026 +0800

    [hotfix] Handle durable execute exception properly (#465)
---
 .../e2e_tests_integration/execute_test.py          | 53 ++++++++++++++++++++++
 .../e2e_tests_integration/execute_test_agent.py    | 40 ++++++++++++++++
 .../ground_truth/test_execute_async_exception.txt  |  3 ++
 .../flink_agents/runtime/flink_runner_context.py   | 13 ++++--
 4 files changed, 106 insertions(+), 3 deletions(-)

diff --git 
a/python/flink_agents/e2e_tests/e2e_tests_integration/execute_test.py 
b/python/flink_agents/e2e_tests/e2e_tests_integration/execute_test.py
index 8118f15b..fec68dad 100644
--- a/python/flink_agents/e2e_tests/e2e_tests_integration/execute_test.py
+++ b/python/flink_agents/e2e_tests/e2e_tests_integration/execute_test.py
@@ -39,6 +39,7 @@ from 
flink_agents.e2e_tests.e2e_tests_integration.execute_test_agent import (
     ExecuteTestAgent,
     ExecuteTestData,
     ExecuteTestKeySelector,
+    ExecuteWithAsyncExceptionTestAgent,
     ExecuteWithAsyncTestAgent,
 )
 from flink_agents.e2e_tests.test_utils import check_result
@@ -203,3 +204,55 @@ def test_durable_execute_with_async_flink(tmp_path: Path) 
-> None:
         ),
     )
 
+
+def test_durable_execute_async_exception_flink(tmp_path: Path) -> None:
+    """Test durable_execute_async() exception handling in Flink environment."""
+    config = Configuration()
+    config.set_string("state.backend.type", "rocksdb")
+    config.set_string("checkpointing.interval", "1s")
+    config.set_string("restart-strategy.type", "disable")
+    env = StreamExecutionEnvironment.get_execution_environment(config)
+    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
+    env.set_parallelism(1)
+
+    input_datastream = env.from_source(
+        source=FileSource.for_record_stream_format(
+            StreamFormat.text_line_format(),
+            f"file:///{current_dir}/../resources/execute_test_input",
+        ).build(),
+        watermark_strategy=WatermarkStrategy.no_watermarks(),
+        source_name="execute_test_source",
+    )
+
+    deserialize_datastream = input_datastream.map(
+        lambda x: ExecuteTestData.model_validate_json(x)
+    )
+
+    agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env)
+    output_datastream = (
+        agents_env.from_datastream(
+            input=deserialize_datastream, key_selector=ExecuteTestKeySelector()
+        )
+        .apply(ExecuteWithAsyncExceptionTestAgent())
+        .to_datastream()
+    )
+
+    result_dir = tmp_path / "results"
+    result_dir.mkdir(parents=True, exist_ok=True)
+
+    output_datastream.map(lambda x: x.model_dump_json(), 
Types.STRING()).add_sink(
+        StreamingFileSink.for_row_format(
+            base_path=str(result_dir.absolute()),
+            encoder=Encoder.simple_string_encoder(),
+        ).build()
+    )
+
+    agents_env.execute()
+
+    check_result(
+        result_dir=result_dir,
+        groud_truth_dir=Path(
+            
f"{current_dir}/../resources/ground_truth/test_execute_async_exception.txt"
+        ),
+    )
+
diff --git 
a/python/flink_agents/e2e_tests/e2e_tests_integration/execute_test_agent.py 
b/python/flink_agents/e2e_tests/e2e_tests_integration/execute_test_agent.py
index a0b39477..c9a675d4 100644
--- a/python/flink_agents/e2e_tests/e2e_tests_integration/execute_test_agent.py
+++ b/python/flink_agents/e2e_tests/e2e_tests_integration/execute_test_agent.py
@@ -56,6 +56,21 @@ class ExecuteTestOutput(BaseModel):
     result: int
 
 
+class ExecuteTestErrorOutput(BaseModel):
+    """Output data model for durable execute exception test.
+
+    Attributes:
+    ----------
+    id : int
+        Unique identifier of the item
+    error : str
+        The error message
+    """
+
+    id: int
+    error: str
+
+
 class ExecuteTestKeySelector(KeySelector):
     """KeySelector for extracting key from ExecuteTestData."""
 
@@ -74,6 +89,11 @@ def multiply_value(x: int, y: int) -> int:
     return x * y
 
 
+def raise_exception(message: str) -> None:
+    """A function that raises a ValueError for testing."""
+    raise ValueError(message)
+
+
 class ExecuteTestAgent(Agent):
     """Agent that uses synchronous durable_execute() method for testing."""
 
@@ -116,3 +136,23 @@ class ExecuteWithAsyncTestAgent(Agent):
             OutputEvent(output=ExecuteTestOutput(id=input_data.id, 
result=async_result))
         )
 
+
+class ExecuteWithAsyncExceptionTestAgent(Agent):
+    """Agent that tests exception handling in durable_execute_async()."""
+
+    @action(InputEvent)
+    @staticmethod
+    async def process(event: Event, ctx: RunnerContext) -> None:
+        """Process an event and capture durable_execute_async() exceptions."""
+        input_data: ExecuteTestData = event.input
+        try:
+            await ctx.durable_execute_async(
+                raise_exception, f"Test error: {input_data.value}"
+            )
+        except ValueError as exc:
+            ctx.send_event(
+                OutputEvent(
+                    output=ExecuteTestErrorOutput(id=input_data.id, 
error=str(exc))
+                )
+            )
+
diff --git 
a/python/flink_agents/e2e_tests/resources/ground_truth/test_execute_async_exception.txt
 
b/python/flink_agents/e2e_tests/resources/ground_truth/test_execute_async_exception.txt
new file mode 100644
index 00000000..6b5e2dd1
--- /dev/null
+++ 
b/python/flink_agents/e2e_tests/resources/ground_truth/test_execute_async_exception.txt
@@ -0,0 +1,3 @@
+{"id":1,"error":"Test error: 5"}
+{"id":2,"error":"Test error: 15"}
+{"id":3,"error":"Test error: 10"}
diff --git a/python/flink_agents/runtime/flink_runner_context.py 
b/python/flink_agents/runtime/flink_runner_context.py
index 0e8664a0..3592d21e 100644
--- a/python/flink_agents/runtime/flink_runner_context.py
+++ b/python/flink_agents/runtime/flink_runner_context.py
@@ -135,14 +135,21 @@ class _DurableAsyncExecutionResult(AsyncExecutionResult):
         future = self._executor.submit(self._func, *self._args, **self._kwargs)
         while not future.done():
             yield
-
-        result = future.result()
+        try:
+            result = future.result()
+        except _DurableExecutionException as exc:
+            # Record and re-raise the original exception for better 
diagnostics.
+            exc.record_and_raise()
 
         # Handle the wrapped result/exception
         if isinstance(result, _DurableExecutionResult):
             return result.get_result()
         elif isinstance(result, _DurableExecutionException):
-            result.record_and_raise()
+            error_message = (
+                "Unexpected _DurableExecutionException returned from executor; 
"
+                "it should have been raised via future.result()."
+            )
+            raise TypeError(error_message) from result.original_exception
         else:
             return result
 

Reply via email to