This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-agents.git
commit 2332d109ea8a3aef044a480b57a7d188e5f1f0e6 Author: WenjinXie <[email protected]> AuthorDate: Fri Sep 26 13:58:54 2025 +0800 [runtime][python] Fix npe when integrate agent from datastream to table. fix --- python/flink_agents/api/execution_environment.py | 9 ++-- ...gent_example.py => from_datastream_to_table.py} | 60 +++++++++++----------- .../integrate_table_with_agent_example.py | 4 +- .../integrate_table_with_react_agent_example.py | 4 +- python/flink_agents/e2e_tests/my_agent.py | 29 +++++++++++ .../runtime/local_execution_environment.py | 13 +++-- .../runtime/remote_execution_environment.py | 49 ++++++++++++++---- 7 files changed, 116 insertions(+), 52 deletions(-) diff --git a/python/flink_agents/api/execution_environment.py b/python/flink_agents/api/execution_environment.py index 9ccc4a4..a198e6c 100644 --- a/python/flink_agents/api/execution_environment.py +++ b/python/flink_agents/api/execution_environment.py @@ -106,7 +106,9 @@ class AgentsExecutionEnvironment(ABC): @staticmethod def get_execution_environment( - env: StreamExecutionEnvironment | None = None, **kwargs: Dict[str, Any] + env: StreamExecutionEnvironment | None = None, + t_env: StreamTableEnvironment | None = None, + **kwargs: Dict[str, Any], ) -> "AgentsExecutionEnvironment": """Get agents execution environment. @@ -123,13 +125,13 @@ class AgentsExecutionEnvironment(ABC): if env is None: return importlib.import_module( "flink_agents.runtime.local_execution_environment" - ).create_instance(env=env, **kwargs) + ).create_instance(env=env, t_env=t_env, **kwargs) else: for path in files("flink_agents.lib").iterdir(): env.add_jars(f"file://{path}") return importlib.import_module( "flink_agents.runtime.remote_execution_environment" - ).create_instance(env=env, **kwargs) + ).create_instance(env=env, t_env=t_env, **kwargs) @abstractmethod def get_config(self, path: str | None = None) -> Configuration: @@ -180,7 +182,6 @@ class AgentsExecutionEnvironment(ABC): def from_table( self, input: Table, - t_env: StreamTableEnvironment, key_selector: KeySelector | Callable | None = None, ) -> AgentBuilder: """Set input for agents. Used for remote execution. diff --git a/python/flink_agents/e2e_tests/integrate_table_with_agent_example.py b/python/flink_agents/e2e_tests/from_datastream_to_table.py similarity index 69% copy from python/flink_agents/e2e_tests/integrate_table_with_agent_example.py copy to python/flink_agents/e2e_tests/from_datastream_to_table.py index 27df5c1..02d7151 100644 --- a/python/flink_agents/e2e_tests/integrate_table_with_agent_example.py +++ b/python/flink_agents/e2e_tests/from_datastream_to_table.py @@ -17,33 +17,32 @@ ################################################################################# from pathlib import Path -from pyflink.common import Row +from pyflink.common import Duration, WatermarkStrategy from pyflink.common.typeinfo import BasicTypeInfo, ExternalTypeInfo, RowTypeInfo from pyflink.datastream import ( KeySelector, RuntimeExecutionMode, StreamExecutionEnvironment, ) -from pyflink.table import ( - DataTypes, - Schema, - StreamTableEnvironment, - TableDescriptor, -) +from pyflink.datastream.connectors.file_system import FileSource, StreamFormat +from pyflink.table import DataTypes, Schema, StreamTableEnvironment, TableDescriptor from flink_agents.api.execution_environment import AgentsExecutionEnvironment -from flink_agents.e2e_tests.my_agent import TableAgent - -current_dir = Path(__file__).parent +from flink_agents.e2e_tests.my_agent import ( + DataStreamToTableAgent, + ItemData, +) class MyKeySelector(KeySelector): """KeySelector for extracting key.""" - def get_key(self, value: Row) -> int: - """Extract key from Row.""" - return value[0] + def get_key(self, value: ItemData) -> int: + """Extract key from ItemData.""" + return value.id + +current_dir = Path(__file__).parent # if this example raises exception "No module named 'flink_agents'", you could set # PYTHONPATH like "os.environ["PYTHONPATH"] = ($VENV_HOME/lib/$PYTHON_VERSION/ @@ -53,28 +52,27 @@ if __name__ == "__main__": env.set_runtime_mode(RuntimeExecutionMode.STREAMING) env.set_parallelism(1) - t_env = StreamTableEnvironment.create(stream_execution_environment=env) - t_env.create_temporary_table( - "source", - TableDescriptor.for_connector("filesystem") - .schema( - Schema.new_builder() - .column("id", DataTypes.BIGINT()) - .column("review", DataTypes.STRING()) - .column("review_score", DataTypes.FLOAT()) - .build() + # currently, bounded source is not supported due to runtime implementation, so + # we use continuous file source here. + input_datastream = env.from_source( + source=FileSource.for_record_stream_format( + StreamFormat.text_line_format(), f"file:///{current_dir}/resources" ) - .option("format", "json") - .option("path", f"file:///{current_dir}/resources") - .option("source.monitor-interval", "60s") + .monitor_continuously(Duration.of_minutes(1)) .build(), + watermark_strategy=WatermarkStrategy.no_watermarks(), + source_name="streaming_agent_example", ) - table = t_env.from_path("source") + deserialize_datastream = input_datastream.map( + lambda x: ItemData.model_validate_json(x) + ) - agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env) + agents_env = AgentsExecutionEnvironment.get_execution_environment( + env=env, t_env=t_env + ) output_type = ExternalTypeInfo( RowTypeInfo( @@ -95,8 +93,10 @@ if __name__ == "__main__": ).build() output_table = ( - agents_env.from_table(input=table, t_env=t_env, key_selector=MyKeySelector()) - .apply(TableAgent()) + agents_env.from_datastream( + input=deserialize_datastream, key_selector=MyKeySelector() + ) + .apply(DataStreamToTableAgent()) .to_table(schema=schema, output_type=output_type) ) diff --git a/python/flink_agents/e2e_tests/integrate_table_with_agent_example.py b/python/flink_agents/e2e_tests/integrate_table_with_agent_example.py index 27df5c1..5983ae7 100644 --- a/python/flink_agents/e2e_tests/integrate_table_with_agent_example.py +++ b/python/flink_agents/e2e_tests/integrate_table_with_agent_example.py @@ -74,7 +74,7 @@ if __name__ == "__main__": table = t_env.from_path("source") - agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env) + agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env, t_env=t_env) output_type = ExternalTypeInfo( RowTypeInfo( @@ -95,7 +95,7 @@ if __name__ == "__main__": ).build() output_table = ( - agents_env.from_table(input=table, t_env=t_env, key_selector=MyKeySelector()) + agents_env.from_table(input=table, key_selector=MyKeySelector()) .apply(TableAgent()) .to_table(schema=schema, output_type=output_type) ) diff --git a/python/flink_agents/e2e_tests/integrate_table_with_react_agent_example.py b/python/flink_agents/e2e_tests/integrate_table_with_react_agent_example.py index e335782..30aaeb3 100644 --- a/python/flink_agents/e2e_tests/integrate_table_with_react_agent_example.py +++ b/python/flink_agents/e2e_tests/integrate_table_with_react_agent_example.py @@ -72,7 +72,7 @@ if __name__ == "__main__": ), ) - env = AgentsExecutionEnvironment.get_execution_environment(env=stream_env) + env = AgentsExecutionEnvironment.get_execution_environment(env=stream_env, t_env=t_env) # register resource to execution environment ( @@ -114,7 +114,7 @@ if __name__ == "__main__": schema = (Schema.new_builder().column("result", DataTypes.INT())).build() output_table = ( - env.from_table(input=table, t_env=t_env, key_selector=MyKeySelector()) + env.from_table(input=table, key_selector=MyKeySelector()) .apply(agent) .to_table(schema=schema, output_type=output_type) ) diff --git a/python/flink_agents/e2e_tests/my_agent.py b/python/flink_agents/e2e_tests/my_agent.py index 1af739e..8b25a08 100644 --- a/python/flink_agents/e2e_tests/my_agent.py +++ b/python/flink_agents/e2e_tests/my_agent.py @@ -21,6 +21,7 @@ import time from typing import Any from pydantic import BaseModel +from pyflink.common import Row from flink_agents.api.agent import Agent from flink_agents.api.decorators import action, tool @@ -51,6 +52,7 @@ class ItemData(BaseModel): class MyEvent(Event): # noqa D101 value: Any + class DataStreamAgent(Agent): """Agent used for explaining integrating agents with DataStream. @@ -138,3 +140,30 @@ class TableAgent(Agent): content = input content["review"] += " second action" ctx.send_event(OutputEvent(output=content)) + + +class DataStreamToTableAgent(Agent): + """Agent used for explaining integrating agents from table to datastream. + + Because pemja will find action in this class when execute Agent, we can't + define this class directly in example.py for module name will be set + to __main__. + """ + + @action(InputEvent) + @staticmethod + def first_action(event: Event, ctx: RunnerContext): # noqa D102 + input = event.input + content = copy.deepcopy(input) + content.review += " first action" + ctx.send_event(MyEvent(value=content)) + + @action(MyEvent) + @staticmethod + def second_action(event: Event, ctx: RunnerContext): # noqa D102 + input = event.value + content = input + content.review += " second action" + ctx.send_event( + OutputEvent(output=Row(**content.model_dump(exclude="memory_info"))) + ) diff --git a/python/flink_agents/runtime/local_execution_environment.py b/python/flink_agents/runtime/local_execution_environment.py index 87d4c1b..ebaf78c 100644 --- a/python/flink_agents/runtime/local_execution_environment.py +++ b/python/flink_agents/runtime/local_execution_environment.py @@ -41,13 +41,17 @@ class LocalAgentBuilder(AgentBuilder): __config: AgentConfiguration def __init__( - self, env: "LocalExecutionEnvironment", input: List[Dict[str, Any]], config: AgentConfiguration + self, + env: "LocalExecutionEnvironment", + input: List[Dict[str, Any]], + config: AgentConfiguration, ) -> None: """Init empty output list.""" self.__env = env self.__input = input self.__output = [] self.__config = config + def apply(self, agent: Agent) -> AgentBuilder: """Create local runner to execute given agent. @@ -130,7 +134,7 @@ class LocalExecutionEnvironment(AgentsExecutionEnvironment): self.__output.append(output) def from_datastream( - self, input: DataStream, key_selector : KeySelector | Callable | None = None + self, input: DataStream, key_selector: KeySelector | Callable | None = None ) -> AgentBuilder: """Set input DataStream of agent execution. @@ -142,7 +146,6 @@ class LocalExecutionEnvironment(AgentsExecutionEnvironment): def from_table( self, input: Table, - t_env: StreamTableEnvironment, key_selector: KeySelector | Callable | None = None, ) -> AgentBuilder: """Set input Table of agent execution. @@ -154,7 +157,7 @@ class LocalExecutionEnvironment(AgentsExecutionEnvironment): def create_instance( - env: StreamExecutionEnvironment, **kwargs: Dict[str, Any] + env: StreamExecutionEnvironment, t_env: StreamTableEnvironment, **kwargs: Any ) -> AgentsExecutionEnvironment: """Factory function to create a remote agents execution environment. @@ -162,6 +165,8 @@ def create_instance( ---------- env : StreamExecutionEnvironment Flink job execution environment. + t_env: StreamTableEnvironment + Flink job execution table environment. **kwargs : Dict[str, Any] The dict of parameters to configure the execution environment. diff --git a/python/flink_agents/runtime/remote_execution_environment.py b/python/flink_agents/runtime/remote_execution_environment.py index 7d649c3..878c527 100644 --- a/python/flink_agents/runtime/remote_execution_environment.py +++ b/python/flink_agents/runtime/remote_execution_environment.py @@ -66,6 +66,15 @@ class RemoteAgentBuilder(AgentBuilder): self.__config = config self.__resources = resources + @property + def t_env(self) -> StreamTableEnvironment: + """Get or crate table environment.""" + if self.__t_env is None: + self.__t_env = StreamTableEnvironment.create( + stream_execution_environment=self.__env + ) + return self.__t_env + def apply(self, agent: Agent) -> "AgentBuilder": """Set agent of execution environment. @@ -134,7 +143,7 @@ class RemoteAgentBuilder(AgentBuilder): Table Output Table of agent execution. """ - return self.__t_env.from_data_stream(self.to_datastream(output_type), schema) + return self.t_env.from_data_stream(self.to_datastream(output_type), schema) def to_list(self) -> List[Dict[str, Any]]: """Get output list of agent execution. @@ -149,18 +158,33 @@ class RemoteExecutionEnvironment(AgentsExecutionEnvironment): """Implementation of AgentsExecutionEnvironment for execution with DataStream.""" __env: StreamExecutionEnvironment + __t_env: StreamTableEnvironment __config: AgentConfiguration - def __init__(self, env: StreamExecutionEnvironment) -> None: + def __init__( + self, + env: StreamExecutionEnvironment, + t_env: StreamTableEnvironment | None = None, + ) -> None: """Init method of RemoteExecutionEnvironment.""" super().__init__() self.__env = env + self.__t_env = t_env self.__config = AgentConfiguration() flink_conf_dir = os.environ.get("FLINK_CONF_DIR") if flink_conf_dir is not None: config_dir = Path(flink_conf_dir) / "config.yaml" self.__config.load_from_file(str(config_dir)) + @property + def t_env(self) -> StreamTableEnvironment: + """Get or crate table environment.""" + if self.__t_env is None: + self.__t_env = StreamTableEnvironment.create( + stream_execution_environment=self.__env + ) + return self.__t_env + def get_config(self, path: str | None = None) -> AgentConfiguration: """Get the writable configuration for flink agents. @@ -200,13 +224,15 @@ class RemoteExecutionEnvironment(AgentsExecutionEnvironment): input = self.__process_input_datastream(input, key_selector) return RemoteAgentBuilder( - input=input, config=self.__config, resources=self.resources + input=input, + config=self.__config, + t_env=self.__t_env, + resources=self.resources, ) def from_table( self, input: Table, - t_env: StreamTableEnvironment, key_selector: KeySelector | Callable | None = None, ) -> AgentBuilder: """Set input Table of agent. @@ -215,18 +241,19 @@ class RemoteExecutionEnvironment(AgentsExecutionEnvironment): ---------- input : Table Receive a Table as input. - t_env: StreamTableEnvironment - table environment supports convert table to/from datastream. key_selector : KeySelector Extract key from each input record. """ - input = t_env.to_data_stream(table=input) + input = self.t_env.to_data_stream(table=input) input = input.map(lambda x: x, output_type=PickledBytesTypeInfo()) input = self.__process_input_datastream(input, key_selector) return RemoteAgentBuilder( - input=input, config=self.__config, t_env=t_env, resources=self.resources + input=input, + config=self.__config, + t_env=self.t_env, + resources=self.resources, ) def from_list(self, input: List[Dict[str, Any]]) -> "AgentsExecutionEnvironment": @@ -243,7 +270,7 @@ class RemoteExecutionEnvironment(AgentsExecutionEnvironment): def create_instance( - env: StreamExecutionEnvironment, **kwargs: Dict[str, Any] + env: StreamExecutionEnvironment, t_env: StreamTableEnvironment, **kwargs: Any ) -> AgentsExecutionEnvironment: """Factory function to create a remote agents execution environment. @@ -251,6 +278,8 @@ def create_instance( ---------- env : StreamExecutionEnvironment Flink job execution environment. + t_env : StreamTableEnvironment + Flink job execution table environment. **kwargs : Dict[str, Any] The dict of parameters to configure the execution environment. @@ -259,4 +288,4 @@ def create_instance( AgentsExecutionEnvironment A configured agents execution environment instance. """ - return RemoteExecutionEnvironment(env=env) + return RemoteExecutionEnvironment(env=env, t_env=t_env)
