This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git

commit 2332d109ea8a3aef044a480b57a7d188e5f1f0e6
Author: WenjinXie <[email protected]>
AuthorDate: Fri Sep 26 13:58:54 2025 +0800

    [runtime][python] Fix npe when integrate agent from datastream to table.
    
    fix
---
 python/flink_agents/api/execution_environment.py   |  9 ++--
 ...gent_example.py => from_datastream_to_table.py} | 60 +++++++++++-----------
 .../integrate_table_with_agent_example.py          |  4 +-
 .../integrate_table_with_react_agent_example.py    |  4 +-
 python/flink_agents/e2e_tests/my_agent.py          | 29 +++++++++++
 .../runtime/local_execution_environment.py         | 13 +++--
 .../runtime/remote_execution_environment.py        | 49 ++++++++++++++----
 7 files changed, 116 insertions(+), 52 deletions(-)

diff --git a/python/flink_agents/api/execution_environment.py 
b/python/flink_agents/api/execution_environment.py
index 9ccc4a4..a198e6c 100644
--- a/python/flink_agents/api/execution_environment.py
+++ b/python/flink_agents/api/execution_environment.py
@@ -106,7 +106,9 @@ class AgentsExecutionEnvironment(ABC):
 
     @staticmethod
     def get_execution_environment(
-        env: StreamExecutionEnvironment | None = None, **kwargs: Dict[str, Any]
+        env: StreamExecutionEnvironment | None = None,
+        t_env: StreamTableEnvironment | None = None,
+        **kwargs: Dict[str, Any],
     ) -> "AgentsExecutionEnvironment":
         """Get agents execution environment.
 
@@ -123,13 +125,13 @@ class AgentsExecutionEnvironment(ABC):
         if env is None:
             return importlib.import_module(
                 "flink_agents.runtime.local_execution_environment"
-            ).create_instance(env=env, **kwargs)
+            ).create_instance(env=env, t_env=t_env, **kwargs)
         else:
             for path in files("flink_agents.lib").iterdir():
                 env.add_jars(f"file://{path}")
             return importlib.import_module(
                 "flink_agents.runtime.remote_execution_environment"
-            ).create_instance(env=env, **kwargs)
+            ).create_instance(env=env, t_env=t_env, **kwargs)
 
     @abstractmethod
     def get_config(self, path: str | None = None) -> Configuration:
@@ -180,7 +182,6 @@ class AgentsExecutionEnvironment(ABC):
     def from_table(
         self,
         input: Table,
-        t_env: StreamTableEnvironment,
         key_selector: KeySelector | Callable | None = None,
     ) -> AgentBuilder:
         """Set input for agents. Used for remote execution.
diff --git 
a/python/flink_agents/e2e_tests/integrate_table_with_agent_example.py 
b/python/flink_agents/e2e_tests/from_datastream_to_table.py
similarity index 69%
copy from python/flink_agents/e2e_tests/integrate_table_with_agent_example.py
copy to python/flink_agents/e2e_tests/from_datastream_to_table.py
index 27df5c1..02d7151 100644
--- a/python/flink_agents/e2e_tests/integrate_table_with_agent_example.py
+++ b/python/flink_agents/e2e_tests/from_datastream_to_table.py
@@ -17,33 +17,32 @@
 
#################################################################################
 from pathlib import Path
 
-from pyflink.common import Row
+from pyflink.common import Duration, WatermarkStrategy
 from pyflink.common.typeinfo import BasicTypeInfo, ExternalTypeInfo, 
RowTypeInfo
 from pyflink.datastream import (
     KeySelector,
     RuntimeExecutionMode,
     StreamExecutionEnvironment,
 )
-from pyflink.table import (
-    DataTypes,
-    Schema,
-    StreamTableEnvironment,
-    TableDescriptor,
-)
+from pyflink.datastream.connectors.file_system import FileSource, StreamFormat
+from pyflink.table import DataTypes, Schema, StreamTableEnvironment, 
TableDescriptor
 
 from flink_agents.api.execution_environment import AgentsExecutionEnvironment
-from flink_agents.e2e_tests.my_agent import TableAgent
-
-current_dir = Path(__file__).parent
+from flink_agents.e2e_tests.my_agent import (
+    DataStreamToTableAgent,
+    ItemData,
+)
 
 
 class MyKeySelector(KeySelector):
     """KeySelector for extracting key."""
 
-    def get_key(self, value: Row) -> int:
-        """Extract key from Row."""
-        return value[0]
+    def get_key(self, value: ItemData) -> int:
+        """Extract key from ItemData."""
+        return value.id
+
 
+current_dir = Path(__file__).parent
 
 # if this example raises exception "No module named 'flink_agents'", you could 
set
 # PYTHONPATH like "os.environ["PYTHONPATH"] = ($VENV_HOME/lib/$PYTHON_VERSION/
@@ -53,28 +52,27 @@ if __name__ == "__main__":
 
     env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
     env.set_parallelism(1)
-
     t_env = StreamTableEnvironment.create(stream_execution_environment=env)
 
-    t_env.create_temporary_table(
-        "source",
-        TableDescriptor.for_connector("filesystem")
-        .schema(
-            Schema.new_builder()
-            .column("id", DataTypes.BIGINT())
-            .column("review", DataTypes.STRING())
-            .column("review_score", DataTypes.FLOAT())
-            .build()
+    # currently, bounded source is not supported due to runtime 
implementation, so
+    # we use continuous file source here.
+    input_datastream = env.from_source(
+        source=FileSource.for_record_stream_format(
+            StreamFormat.text_line_format(), f"file:///{current_dir}/resources"
         )
-        .option("format", "json")
-        .option("path", f"file:///{current_dir}/resources")
-        .option("source.monitor-interval", "60s")
+        .monitor_continuously(Duration.of_minutes(1))
         .build(),
+        watermark_strategy=WatermarkStrategy.no_watermarks(),
+        source_name="streaming_agent_example",
     )
 
-    table = t_env.from_path("source")
+    deserialize_datastream = input_datastream.map(
+        lambda x: ItemData.model_validate_json(x)
+    )
 
-    agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env)
+    agents_env = AgentsExecutionEnvironment.get_execution_environment(
+        env=env, t_env=t_env
+    )
 
     output_type = ExternalTypeInfo(
         RowTypeInfo(
@@ -95,8 +93,10 @@ if __name__ == "__main__":
     ).build()
 
     output_table = (
-        agents_env.from_table(input=table, t_env=t_env, 
key_selector=MyKeySelector())
-        .apply(TableAgent())
+        agents_env.from_datastream(
+            input=deserialize_datastream, key_selector=MyKeySelector()
+        )
+        .apply(DataStreamToTableAgent())
         .to_table(schema=schema, output_type=output_type)
     )
 
diff --git 
a/python/flink_agents/e2e_tests/integrate_table_with_agent_example.py 
b/python/flink_agents/e2e_tests/integrate_table_with_agent_example.py
index 27df5c1..5983ae7 100644
--- a/python/flink_agents/e2e_tests/integrate_table_with_agent_example.py
+++ b/python/flink_agents/e2e_tests/integrate_table_with_agent_example.py
@@ -74,7 +74,7 @@ if __name__ == "__main__":
 
     table = t_env.from_path("source")
 
-    agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env)
+    agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env, 
t_env=t_env)
 
     output_type = ExternalTypeInfo(
         RowTypeInfo(
@@ -95,7 +95,7 @@ if __name__ == "__main__":
     ).build()
 
     output_table = (
-        agents_env.from_table(input=table, t_env=t_env, 
key_selector=MyKeySelector())
+        agents_env.from_table(input=table, key_selector=MyKeySelector())
         .apply(TableAgent())
         .to_table(schema=schema, output_type=output_type)
     )
diff --git 
a/python/flink_agents/e2e_tests/integrate_table_with_react_agent_example.py 
b/python/flink_agents/e2e_tests/integrate_table_with_react_agent_example.py
index e335782..30aaeb3 100644
--- a/python/flink_agents/e2e_tests/integrate_table_with_react_agent_example.py
+++ b/python/flink_agents/e2e_tests/integrate_table_with_react_agent_example.py
@@ -72,7 +72,7 @@ if __name__ == "__main__":
         ),
     )
 
-    env = AgentsExecutionEnvironment.get_execution_environment(env=stream_env)
+    env = AgentsExecutionEnvironment.get_execution_environment(env=stream_env, 
t_env=t_env)
 
     # register resource to execution environment
     (
@@ -114,7 +114,7 @@ if __name__ == "__main__":
     schema = (Schema.new_builder().column("result", DataTypes.INT())).build()
 
     output_table = (
-        env.from_table(input=table, t_env=t_env, key_selector=MyKeySelector())
+        env.from_table(input=table, key_selector=MyKeySelector())
         .apply(agent)
         .to_table(schema=schema, output_type=output_type)
     )
diff --git a/python/flink_agents/e2e_tests/my_agent.py 
b/python/flink_agents/e2e_tests/my_agent.py
index 1af739e..8b25a08 100644
--- a/python/flink_agents/e2e_tests/my_agent.py
+++ b/python/flink_agents/e2e_tests/my_agent.py
@@ -21,6 +21,7 @@ import time
 from typing import Any
 
 from pydantic import BaseModel
+from pyflink.common import Row
 
 from flink_agents.api.agent import Agent
 from flink_agents.api.decorators import action, tool
@@ -51,6 +52,7 @@ class ItemData(BaseModel):
 class MyEvent(Event):  # noqa D101
     value: Any
 
+
 class DataStreamAgent(Agent):
     """Agent used for explaining integrating agents with DataStream.
 
@@ -138,3 +140,30 @@ class TableAgent(Agent):
         content = input
         content["review"] += " second action"
         ctx.send_event(OutputEvent(output=content))
+
+
+class DataStreamToTableAgent(Agent):
+    """Agent used for explaining integrating agents from table to datastream.
+
+    Because pemja will find action in this class when execute Agent, we can't
+    define this class directly in example.py for module name will be set
+    to __main__.
+    """
+
+    @action(InputEvent)
+    @staticmethod
+    def first_action(event: Event, ctx: RunnerContext):  # noqa D102
+        input = event.input
+        content = copy.deepcopy(input)
+        content.review += " first action"
+        ctx.send_event(MyEvent(value=content))
+
+    @action(MyEvent)
+    @staticmethod
+    def second_action(event: Event, ctx: RunnerContext):  # noqa D102
+        input = event.value
+        content = input
+        content.review += " second action"
+        ctx.send_event(
+            
OutputEvent(output=Row(**content.model_dump(exclude="memory_info")))
+        )
diff --git a/python/flink_agents/runtime/local_execution_environment.py 
b/python/flink_agents/runtime/local_execution_environment.py
index 87d4c1b..ebaf78c 100644
--- a/python/flink_agents/runtime/local_execution_environment.py
+++ b/python/flink_agents/runtime/local_execution_environment.py
@@ -41,13 +41,17 @@ class LocalAgentBuilder(AgentBuilder):
     __config: AgentConfiguration
 
     def __init__(
-        self, env: "LocalExecutionEnvironment", input: List[Dict[str, Any]], 
config: AgentConfiguration
+        self,
+        env: "LocalExecutionEnvironment",
+        input: List[Dict[str, Any]],
+        config: AgentConfiguration,
     ) -> None:
         """Init empty output list."""
         self.__env = env
         self.__input = input
         self.__output = []
         self.__config = config
+
     def apply(self, agent: Agent) -> AgentBuilder:
         """Create local runner to execute given agent.
 
@@ -130,7 +134,7 @@ class LocalExecutionEnvironment(AgentsExecutionEnvironment):
             self.__output.append(output)
 
     def from_datastream(
-        self, input: DataStream, key_selector : KeySelector | Callable | None 
= None
+        self, input: DataStream, key_selector: KeySelector | Callable | None = 
None
     ) -> AgentBuilder:
         """Set input DataStream of agent execution.
 
@@ -142,7 +146,6 @@ class LocalExecutionEnvironment(AgentsExecutionEnvironment):
     def from_table(
         self,
         input: Table,
-        t_env: StreamTableEnvironment,
         key_selector: KeySelector | Callable | None = None,
     ) -> AgentBuilder:
         """Set input Table of agent execution.
@@ -154,7 +157,7 @@ class LocalExecutionEnvironment(AgentsExecutionEnvironment):
 
 
 def create_instance(
-    env: StreamExecutionEnvironment, **kwargs: Dict[str, Any]
+    env: StreamExecutionEnvironment, t_env: StreamTableEnvironment, **kwargs: 
Any
 ) -> AgentsExecutionEnvironment:
     """Factory function to create a remote agents execution environment.
 
@@ -162,6 +165,8 @@ def create_instance(
     ----------
     env : StreamExecutionEnvironment
         Flink job execution environment.
+    t_env: StreamTableEnvironment
+        Flink job execution table environment.
     **kwargs : Dict[str, Any]
         The dict of parameters to configure the execution environment.
 
diff --git a/python/flink_agents/runtime/remote_execution_environment.py 
b/python/flink_agents/runtime/remote_execution_environment.py
index 7d649c3..878c527 100644
--- a/python/flink_agents/runtime/remote_execution_environment.py
+++ b/python/flink_agents/runtime/remote_execution_environment.py
@@ -66,6 +66,15 @@ class RemoteAgentBuilder(AgentBuilder):
         self.__config = config
         self.__resources = resources
 
+    @property
+    def t_env(self) -> StreamTableEnvironment:
+        """Get or crate table environment."""
+        if self.__t_env is None:
+            self.__t_env = StreamTableEnvironment.create(
+                stream_execution_environment=self.__env
+            )
+        return self.__t_env
+
     def apply(self, agent: Agent) -> "AgentBuilder":
         """Set agent of execution environment.
 
@@ -134,7 +143,7 @@ class RemoteAgentBuilder(AgentBuilder):
         Table
             Output Table of agent execution.
         """
-        return self.__t_env.from_data_stream(self.to_datastream(output_type), 
schema)
+        return self.t_env.from_data_stream(self.to_datastream(output_type), 
schema)
 
     def to_list(self) -> List[Dict[str, Any]]:
         """Get output list of agent execution.
@@ -149,18 +158,33 @@ class 
RemoteExecutionEnvironment(AgentsExecutionEnvironment):
     """Implementation of AgentsExecutionEnvironment for execution with 
DataStream."""
 
     __env: StreamExecutionEnvironment
+    __t_env: StreamTableEnvironment
     __config: AgentConfiguration
 
-    def __init__(self, env: StreamExecutionEnvironment) -> None:
+    def __init__(
+        self,
+        env: StreamExecutionEnvironment,
+        t_env: StreamTableEnvironment | None = None,
+    ) -> None:
         """Init method of RemoteExecutionEnvironment."""
         super().__init__()
         self.__env = env
+        self.__t_env = t_env
         self.__config = AgentConfiguration()
         flink_conf_dir = os.environ.get("FLINK_CONF_DIR")
         if flink_conf_dir is not None:
             config_dir = Path(flink_conf_dir) / "config.yaml"
             self.__config.load_from_file(str(config_dir))
 
+    @property
+    def t_env(self) -> StreamTableEnvironment:
+        """Get or crate table environment."""
+        if self.__t_env is None:
+            self.__t_env = StreamTableEnvironment.create(
+                stream_execution_environment=self.__env
+            )
+        return self.__t_env
+
     def get_config(self, path: str | None = None) -> AgentConfiguration:
         """Get the writable configuration for flink agents.
 
@@ -200,13 +224,15 @@ class 
RemoteExecutionEnvironment(AgentsExecutionEnvironment):
         input = self.__process_input_datastream(input, key_selector)
 
         return RemoteAgentBuilder(
-            input=input, config=self.__config, resources=self.resources
+            input=input,
+            config=self.__config,
+            t_env=self.__t_env,
+            resources=self.resources,
         )
 
     def from_table(
         self,
         input: Table,
-        t_env: StreamTableEnvironment,
         key_selector: KeySelector | Callable | None = None,
     ) -> AgentBuilder:
         """Set input Table of agent.
@@ -215,18 +241,19 @@ class 
RemoteExecutionEnvironment(AgentsExecutionEnvironment):
         ----------
         input : Table
             Receive a Table as input.
-        t_env: StreamTableEnvironment
-            table environment supports convert table to/from datastream.
         key_selector : KeySelector
             Extract key from each input record.
         """
-        input = t_env.to_data_stream(table=input)
+        input = self.t_env.to_data_stream(table=input)
 
         input = input.map(lambda x: x, output_type=PickledBytesTypeInfo())
 
         input = self.__process_input_datastream(input, key_selector)
         return RemoteAgentBuilder(
-            input=input, config=self.__config, t_env=t_env, 
resources=self.resources
+            input=input,
+            config=self.__config,
+            t_env=self.t_env,
+            resources=self.resources,
         )
 
     def from_list(self, input: List[Dict[str, Any]]) -> 
"AgentsExecutionEnvironment":
@@ -243,7 +270,7 @@ class 
RemoteExecutionEnvironment(AgentsExecutionEnvironment):
 
 
 def create_instance(
-    env: StreamExecutionEnvironment, **kwargs: Dict[str, Any]
+    env: StreamExecutionEnvironment, t_env: StreamTableEnvironment, **kwargs: 
Any
 ) -> AgentsExecutionEnvironment:
     """Factory function to create a remote agents execution environment.
 
@@ -251,6 +278,8 @@ def create_instance(
     ----------
     env : StreamExecutionEnvironment
         Flink job execution environment.
+    t_env : StreamTableEnvironment
+        Flink job execution table environment.
     **kwargs : Dict[str, Any]
         The dict of parameters to configure the execution environment.
 
@@ -259,4 +288,4 @@ def create_instance(
     AgentsExecutionEnvironment
         A configured agents execution environment instance.
     """
-    return RemoteExecutionEnvironment(env=env)
+    return RemoteExecutionEnvironment(env=env, t_env=t_env)

Reply via email to