This is an automated email from the ASF dual-hosted git repository. wenjin272 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-agents.git
commit 269c08dd2a6b96a718b4e98db0697a92a0cae690 Author: WenjinXie <[email protected]> AuthorDate: Tue Apr 7 22:24:28 2026 +0800 [api] Extract resource context. --- python/flink_agents/api/chat_models/chat_model.py | 15 ++++--- .../api/embedding_models/embedding_model.py | 4 +- python/flink_agents/api/resource.py | 15 ++++--- python/flink_agents/api/resource_context.py | 36 +++++++++++++++ .../flink_agents/api/vector_stores/vector_store.py | 4 +- .../anthropic/tests/test_anthropic_chat_model.py | 12 ++++- .../azure/tests/test_azure_openai_chat_model.py | 12 ++++- .../openai/tests/test_openai_chat_model.py | 12 ++++- .../chat_models/tests/test_ollama_chat_model.py | 11 ++++- .../chat_models/tests/test_tongyi_chat_model.py | 11 ++++- .../local/tests/test_ollama_embedding_model.py | 7 ++- .../tests/test_openai_embedding_model.py | 7 ++- .../tests/test_tongyi_embedding_model.py | 26 ++++++++--- .../chroma/tests/test_chroma_vector_store.py | 28 +++++++++--- python/flink_agents/plan/agent_plan.py | 14 ++++-- python/flink_agents/plan/resource_provider.py | 31 ++++++++----- .../plan/tests/actions/test_chat_model_action.py | 24 +++++----- .../flink_agents/runtime/flink_runner_context.py | 4 ++ .../runtime/java/java_resource_wrapper.py | 4 +- python/flink_agents/runtime/local_runner.py | 4 ++ python/flink_agents/runtime/python_java_utils.py | 13 +++--- python/flink_agents/runtime/resource_cache.py | 8 +++- python/flink_agents/runtime/resource_context.py | 51 ++++++++++++++++++++++ .../runtime/tests/test_built_in_actions.py | 10 +++-- .../python/utils/PythonResourceAdapterImpl.java | 12 ++--- .../utils/PythonResourceAdapterImplTest.java | 4 +- 26 files changed, 299 insertions(+), 80 deletions(-) diff --git a/python/flink_agents/api/chat_models/chat_model.py b/python/flink_agents/api/chat_models/chat_model.py index bb3a5ebc..81608c79 100644 --- a/python/flink_agents/api/chat_models/chat_model.py +++ b/python/flink_agents/api/chat_models/chat_model.py @@ -22,7 +22,10 @@ from typing import Any, ClassVar, Dict, List, Sequence, Tuple, cast from pydantic import Field, PrivateAttr from typing_extensions import override -from flink_agents.api.chat_message import ChatMessage, MessageRole +from flink_agents.api.chat_message import ( + ChatMessage, + MessageRole, +) from flink_agents.api.prompts.prompt import Prompt from flink_agents.api.resource import Resource, ResourceType from flink_agents.api.tools.tool import Tool @@ -157,20 +160,22 @@ class BaseChatModelSetup(Resource): def open(self) -> None: self._resolved_connection = cast( "BaseChatModelConnection", - self.get_resource(self.connection, ResourceType.CHAT_MODEL_CONNECTION), + self.resource_context.get_resource(self.connection, ResourceType.CHAT_MODEL_CONNECTION), ) if self.prompt is not None: if isinstance(self.prompt, str): # Get prompt resource if it's a string self.prompt = cast( - "Prompt", self.get_resource(self.prompt, ResourceType.PROMPT) + "Prompt", self.resource_context.get_resource(self.prompt, ResourceType.PROMPT) ) - if self.tools is not None: + + if len(self.tools) > 0: self.tools = [ - cast("Tool", self.get_resource(tool_name, ResourceType.TOOL)) + cast("Tool", self.resource_context.get_resource(tool_name, ResourceType.TOOL)) for tool_name in self.tools ] + def chat(self, messages: Sequence[ChatMessage], **kwargs: Any) -> ChatMessage: """Execute chat conversation. diff --git a/python/flink_agents/api/embedding_models/embedding_model.py b/python/flink_agents/api/embedding_models/embedding_model.py index b7088982..5529c817 100644 --- a/python/flink_agents/api/embedding_models/embedding_model.py +++ b/python/flink_agents/api/embedding_models/embedding_model.py @@ -93,7 +93,9 @@ class BaseEmbeddingModelSetup(Resource, ABC): def open(self) -> None: self.connection = cast( "BaseEmbeddingModelConnection", - self.get_resource(self.connection, ResourceType.EMBEDDING_MODEL_CONNECTION), + self.resource_context.get_resource( + self.connection, ResourceType.EMBEDDING_MODEL_CONNECTION + ), ) def _get_connection(self) -> BaseEmbeddingModelConnection: diff --git a/python/flink_agents/api/resource.py b/python/flink_agents/api/resource.py index 13155380..3b577233 100644 --- a/python/flink_agents/api/resource.py +++ b/python/flink_agents/api/resource.py @@ -18,9 +18,11 @@ import importlib from abc import ABC, abstractmethod from enum import Enum -from typing import TYPE_CHECKING, Any, Callable, Dict, Type +from typing import TYPE_CHECKING, Any, Dict, Type -from pydantic import BaseModel, Field, PrivateAttr, model_validator +from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, model_validator + +from flink_agents.api.resource_context import ResourceContext if TYPE_CHECKING: from flink_agents.api.metric_group import MetricGroup @@ -30,7 +32,7 @@ class ResourceType(Enum): """Type enum of resource. Currently, support chat_model, chat_model_server, tool, embedding_model, - vector_store and prompt. + vector_store, prompt, mcp_server. """ CHAT_MODEL = "chat_model" @@ -52,14 +54,13 @@ class Resource(BaseModel, ABC): Attributes: ---------- - get_resource : Callable[[str, ResourceType], "Resource"] + resource_context : ResourceContext Get other resource object declared in the same Agent. The first argument is resource name and the second argument is resource type. """ - get_resource: Callable[[str, ResourceType], "Resource"] = Field( - exclude=True, default=None - ) + model_config = ConfigDict(arbitrary_types_allowed=True) + resource_context: ResourceContext | None = Field(exclude=True, default=None) # The metric group bound to this resource, injected in RunnerContext#get_resource _metric_group: "MetricGroup | None" = PrivateAttr(default=None) diff --git a/python/flink_agents/api/resource_context.py b/python/flink_agents/api/resource_context.py new file mode 100644 index 00000000..d0508508 --- /dev/null +++ b/python/flink_agents/api/resource_context.py @@ -0,0 +1,36 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +"""Public ResourceContext interface. + +Defines the capabilities available to a Resource during execution. +The concrete implementation lives in :mod:`flink_agents.runtime.resource_context`. +""" + +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from flink_agents.api.resource import Resource, ResourceType + + +class ResourceContext(ABC): + """Base abstract class for Resource Context.""" + + @abstractmethod + def get_resource(self, name: str, resource_type: "ResourceType") -> "Resource": + """Get another resource declared in the same Agent.""" diff --git a/python/flink_agents/api/vector_stores/vector_store.py b/python/flink_agents/api/vector_stores/vector_store.py index 9bfd0f9c..d16d00c7 100644 --- a/python/flink_agents/api/vector_stores/vector_store.py +++ b/python/flink_agents/api/vector_stores/vector_store.py @@ -224,7 +224,9 @@ class BaseVectorStore(Resource, ABC): if self.embedding_model is not None: self.embedding_model = cast( "BaseEmbeddingModelSetup", - self.get_resource(self.embedding_model, ResourceType.EMBEDDING_MODEL), + self.resource_context.get_resource( + self.embedding_model, ResourceType.EMBEDDING_MODEL + ), ) def _get_embedding_model(self) -> BaseEmbeddingModelSetup: diff --git a/python/flink_agents/integrations/chat_models/anthropic/tests/test_anthropic_chat_model.py b/python/flink_agents/integrations/chat_models/anthropic/tests/test_anthropic_chat_model.py index e6b0acb2..767cb6a4 100644 --- a/python/flink_agents/integrations/chat_models/anthropic/tests/test_anthropic_chat_model.py +++ b/python/flink_agents/integrations/chat_models/anthropic/tests/test_anthropic_chat_model.py @@ -16,11 +16,13 @@ # limitations under the License. ################################################################################# import os +from unittest.mock import MagicMock import pytest from flink_agents.api.chat_message import ChatMessage, MessageRole from flink_agents.api.resource import Resource, ResourceType +from flink_agents.api.resource_context import ResourceContext from flink_agents.integrations.chat_models.anthropic.anthropic_chat_model import ( AnthropicChatModelConnection, AnthropicChatModelSetup, @@ -41,11 +43,14 @@ def test_anthropic_chat_model() -> None: else: return get_resource(name, ResourceType.TOOL) + mock_ctx = MagicMock(spec=ResourceContext) + mock_ctx.get_resource = get_resource + chat_model = AnthropicChatModelSetup( name="anthropic", model=test_model, connection="anthropic_server", - get_resource=get_resource, + resource_context=mock_ctx, ) response = chat_model.chat([ChatMessage(role=MessageRole.USER, content="Hello!")]) assert response is not None @@ -80,12 +85,15 @@ def test_anthropic_chat_with_tools() -> None: else: return from_callable(func=add) + mock_ctx = MagicMock(spec=ResourceContext) + mock_ctx.get_resource = get_resource + chat_model = AnthropicChatModelSetup( name="anthropic", model=test_model, connection="anthropic_server", tools=["add"], - get_resource=get_resource, + resource_context=mock_ctx, ) response = chat_model.chat( [ChatMessage(role=MessageRole.USER, content="What is 1 + 1?")] diff --git a/python/flink_agents/integrations/chat_models/azure/tests/test_azure_openai_chat_model.py b/python/flink_agents/integrations/chat_models/azure/tests/test_azure_openai_chat_model.py index 58d4a5c0..95172a67 100644 --- a/python/flink_agents/integrations/chat_models/azure/tests/test_azure_openai_chat_model.py +++ b/python/flink_agents/integrations/chat_models/azure/tests/test_azure_openai_chat_model.py @@ -16,11 +16,13 @@ # limitations under the License. ################################################################################# import os +from unittest.mock import MagicMock import pytest from flink_agents.api.chat_message import ChatMessage, MessageRole from flink_agents.api.resource import Resource, ResourceType +from flink_agents.api.resource_context import ResourceContext from flink_agents.integrations.chat_models.azure.azure_openai_chat_model import ( AzureOpenAIChatModelConnection, AzureOpenAIChatModelSetup, @@ -48,11 +50,14 @@ def test_azure_openai_chat_model() -> None: else: return get_resource(name, ResourceType.TOOL) + mock_ctx = MagicMock(spec=ResourceContext) + mock_ctx.get_resource = get_resource + chat_model = AzureOpenAIChatModelSetup( name="azure_openai", model=test_deployment, connection="azure_openai", - get_resource=get_resource, + resource_context=mock_ctx, ) response = chat_model.chat([ChatMessage(role=MessageRole.USER, content="Hello!")]) assert response is not None @@ -92,12 +97,15 @@ def test_azure_openai_chat_with_tools() -> None: else: return from_callable(func=add) + mock_ctx = MagicMock(spec=ResourceContext) + mock_ctx.get_resource = get_resource + chat_model = AzureOpenAIChatModelSetup( name="azure_openai", model=test_deployment, connection="azure_openai", tools=["add"], - get_resource=get_resource, + resource_context=mock_ctx, ) response = chat_model.chat( [ diff --git a/python/flink_agents/integrations/chat_models/openai/tests/test_openai_chat_model.py b/python/flink_agents/integrations/chat_models/openai/tests/test_openai_chat_model.py index 15249a9c..3c893ecb 100644 --- a/python/flink_agents/integrations/chat_models/openai/tests/test_openai_chat_model.py +++ b/python/flink_agents/integrations/chat_models/openai/tests/test_openai_chat_model.py @@ -16,11 +16,13 @@ # limitations under the License. ################################################################################# import os +from unittest.mock import MagicMock import pytest from flink_agents.api.chat_message import ChatMessage, MessageRole from flink_agents.api.resource import Resource, ResourceType +from flink_agents.api.resource_context import ResourceContext from flink_agents.integrations.chat_models.openai.openai_chat_model import ( OpenAIChatModelConnection, OpenAIChatModelSetup, @@ -44,8 +46,11 @@ def test_openai_chat_model() -> None: else: return get_resource(name, ResourceType.TOOL) + mock_ctx = MagicMock(spec=ResourceContext) + mock_ctx.get_resource = get_resource + chat_model = OpenAIChatModelSetup( - name="openai", model=test_model, connection="openai", get_resource=get_resource + name="openai", model=test_model, connection="openai", resource_context=mock_ctx ) response = chat_model.chat([ChatMessage(role=MessageRole.USER, content="Hello!")]) assert response is not None @@ -82,12 +87,15 @@ def test_openai_chat_with_tools() -> None: else: return from_callable(func=add) + mock_ctx = MagicMock(spec=ResourceContext) + mock_ctx.get_resource = get_resource + chat_model = OpenAIChatModelSetup( name="openai", model=test_model, connection="openai", tools=["add"], - get_resource=get_resource, + resource_context=mock_ctx, ) response = chat_model.chat( [ChatMessage(role=MessageRole.USER, content="What is 377 + 688?")] diff --git a/python/flink_agents/integrations/chat_models/tests/test_ollama_chat_model.py b/python/flink_agents/integrations/chat_models/tests/test_ollama_chat_model.py index 34702d3a..e56fc0fd 100644 --- a/python/flink_agents/integrations/chat_models/tests/test_ollama_chat_model.py +++ b/python/flink_agents/integrations/chat_models/tests/test_ollama_chat_model.py @@ -26,6 +26,7 @@ from ollama import Client from flink_agents.api.chat_message import ChatMessage, MessageRole from flink_agents.api.resource import Resource, ResourceType +from flink_agents.api.resource_context import ResourceContext from flink_agents.integrations.chat_models.ollama_chat_model import ( OllamaChatModelConnection, OllamaChatModelSetup, @@ -104,12 +105,15 @@ def test_ollama_chat_with_tools() -> None: else: return connection + mock_ctx = MagicMock(spec=ResourceContext) + mock_ctx.get_resource = get_resource + llm = OllamaChatModelSetup( name="ollama", connection="ollama", model=test_model, tools=["add"], - get_resource=get_resource, + resource_context=mock_ctx, ) llm.open() @@ -170,12 +174,15 @@ def test_ollama_chat_with_extract_reasoning() -> None: def get_resource(name: str, type: ResourceType) -> Resource: return connection + mock_ctx = MagicMock(spec=ResourceContext) + mock_ctx.get_resource = get_resource + llm = OllamaChatModelSetup( name="ollama", connection="ollama", model=test_model, extract_reasoning=True, - get_resource=get_resource, + resource_context=mock_ctx, ) llm.open() diff --git a/python/flink_agents/integrations/chat_models/tests/test_tongyi_chat_model.py b/python/flink_agents/integrations/chat_models/tests/test_tongyi_chat_model.py index 39fb1578..b6be7404 100644 --- a/python/flink_agents/integrations/chat_models/tests/test_tongyi_chat_model.py +++ b/python/flink_agents/integrations/chat_models/tests/test_tongyi_chat_model.py @@ -23,6 +23,7 @@ import pytest from flink_agents.api.chat_message import ChatMessage, MessageRole from flink_agents.api.resource import Resource, ResourceType +from flink_agents.api.resource_context import ResourceContext from flink_agents.integrations.chat_models.tongyi_chat_model import ( TongyiChatModelConnection, TongyiChatModelSetup, @@ -80,12 +81,15 @@ def test_tongyi_chat_with_tools() -> None: else: return connection + mock_ctx = MagicMock(spec=ResourceContext) + mock_ctx.get_resource = get_resource + llm = TongyiChatModelSetup( name="tongyi", model=test_model, connection="tongyi", tools=["add"], - get_resource=get_resource, + resource_context=mock_ctx, ) llm.open() @@ -145,12 +149,15 @@ def test_tongyi_chat_with_extract_reasoning(monkeypatch: pytest.MonkeyPatch) -> def get_resource(name: str, type: ResourceType) -> Resource: return connection + mock_ctx = MagicMock(spec=ResourceContext) + mock_ctx.get_resource = get_resource + llm = TongyiChatModelSetup( name="tongyi", model=test_model, connection="tongyi", extract_reasoning=True, - get_resource=get_resource, + resource_context=mock_ctx, ) llm.open() diff --git a/python/flink_agents/integrations/embedding_models/local/tests/test_ollama_embedding_model.py b/python/flink_agents/integrations/embedding_models/local/tests/test_ollama_embedding_model.py index e275a97f..ab983cfa 100644 --- a/python/flink_agents/integrations/embedding_models/local/tests/test_ollama_embedding_model.py +++ b/python/flink_agents/integrations/embedding_models/local/tests/test_ollama_embedding_model.py @@ -19,11 +19,13 @@ import os import subprocess import sys from pathlib import Path +from unittest.mock import MagicMock import pytest from ollama import Client from flink_agents.api.resource import Resource, ResourceType +from flink_agents.api.resource_context import ResourceContext from flink_agents.integrations.embedding_models.local.ollama_embedding_model import ( OllamaEmbeddingModelConnection, OllamaEmbeddingModelSetup, @@ -66,12 +68,15 @@ def test_ollama_embedding_setup() -> None: def get_resource(name: str, type: ResourceType) -> Resource: return connection + mock_ctx = MagicMock(spec=ResourceContext) + mock_ctx.get_resource = get_resource + setup = OllamaEmbeddingModelSetup( name="embeddings", connection="ollama_embed", model=test_model, truncate=True, - get_resource=get_resource, + resource_context=mock_ctx, ) # Test embedding through setup diff --git a/python/flink_agents/integrations/embedding_models/tests/test_openai_embedding_model.py b/python/flink_agents/integrations/embedding_models/tests/test_openai_embedding_model.py index c1a736f2..75382035 100644 --- a/python/flink_agents/integrations/embedding_models/tests/test_openai_embedding_model.py +++ b/python/flink_agents/integrations/embedding_models/tests/test_openai_embedding_model.py @@ -16,10 +16,12 @@ # limitations under the License. ################################################################################ import os +from unittest.mock import MagicMock import pytest from flink_agents.api.resource import Resource, ResourceType +from flink_agents.api.resource_context import ResourceContext from flink_agents.integrations.embedding_models.openai_embedding_model import ( OpenAIEmbeddingModelConnection, OpenAIEmbeddingModelSetup, @@ -40,8 +42,11 @@ def test_openai_embedding_model() -> None: msg = f"Unknown resource type: {type}" raise ValueError(msg) + mock_ctx = MagicMock(spec=ResourceContext) + mock_ctx.get_resource = get_resource + embedding_model = OpenAIEmbeddingModelSetup( - name="openai", model=test_model, connection="openai", get_resource=get_resource + name="openai", model=test_model, connection="openai", resource_context=mock_ctx ) response = embedding_model.embed("Hello, Flink Agent!") diff --git a/python/flink_agents/integrations/embedding_models/tests/test_tongyi_embedding_model.py b/python/flink_agents/integrations/embedding_models/tests/test_tongyi_embedding_model.py index 54c59578..75e47706 100644 --- a/python/flink_agents/integrations/embedding_models/tests/test_tongyi_embedding_model.py +++ b/python/flink_agents/integrations/embedding_models/tests/test_tongyi_embedding_model.py @@ -23,6 +23,7 @@ from unittest.mock import MagicMock import pytest from flink_agents.api.resource import Resource, ResourceType +from flink_agents.api.resource_context import ResourceContext from flink_agents.integrations.embedding_models.tongyi_embedding_model import ( TongyiEmbeddingModelConnection, TongyiEmbeddingModelSetup, @@ -32,6 +33,12 @@ test_model = os.environ.get("TONGYI_EMBEDDING_MODEL", "text-embedding-v4") api_key_available = "DASHSCOPE_API_KEY" in os.environ +def _make_ctx(get_resource) -> ResourceContext: + ctx = MagicMock(spec=ResourceContext) + ctx.get_resource = get_resource + return ctx + + @pytest.mark.skipif(not api_key_available, reason="DashScope API key is not set") def test_tongyi_embedding_model() -> None: """Test basic embedding functionality of TongyiEmbeddingModelConnection.""" @@ -45,7 +52,10 @@ def test_tongyi_embedding_model() -> None: raise ValueError(msg) embedding_model = TongyiEmbeddingModelSetup( - name="tongyi", model=test_model, connection="tongyi", get_resource=get_resource + name="tongyi", + model=test_model, + connection="tongyi", + resource_context=_make_ctx(get_resource), ) embedding_model.open() @@ -75,7 +85,7 @@ def test_tongyi_embedding_with_text_type() -> None: model=test_model, connection="tongyi", text_type="query", - get_resource=get_resource, + resource_context=_make_ctx(get_resource), ) embedding_model_query.open() @@ -89,7 +99,7 @@ def test_tongyi_embedding_with_text_type() -> None: model=test_model, connection="tongyi", text_type="document", - get_resource=get_resource, + resource_context=_make_ctx(get_resource), ) embedding_model_doc.open() @@ -129,7 +139,10 @@ def test_tongyi_embedding_mock(monkeypatch: pytest.MonkeyPatch) -> None: raise ValueError(msg) embedding_model = TongyiEmbeddingModelSetup( - name="tongyi", model=test_model, connection="tongyi", get_resource=get_resource + name="tongyi", + model=test_model, + connection="tongyi", + resource_context=_make_ctx(get_resource), ) embedding_model.open() @@ -175,7 +188,10 @@ def test_tongyi_embedding_batch_mock(monkeypatch: pytest.MonkeyPatch) -> None: raise ValueError(msg) embedding_model = TongyiEmbeddingModelSetup( - name="tongyi", model=test_model, connection="tongyi", get_resource=get_resource + name="tongyi", + model=test_model, + connection="tongyi", + resource_context=_make_ctx(get_resource), ) embedding_model.open() diff --git a/python/flink_agents/integrations/vector_stores/chroma/tests/test_chroma_vector_store.py b/python/flink_agents/integrations/vector_stores/chroma/tests/test_chroma_vector_store.py index 0fa9f775..754798cc 100644 --- a/python/flink_agents/integrations/vector_stores/chroma/tests/test_chroma_vector_store.py +++ b/python/flink_agents/integrations/vector_stores/chroma/tests/test_chroma_vector_store.py @@ -17,10 +17,13 @@ ################################################################################ import os from typing import Any, Dict, List +from unittest.mock import MagicMock import pytest from chromadb.errors import NotFoundError +from flink_agents.api.resource_context import ResourceContext + try: import chromadb # noqa: F401 @@ -96,11 +99,14 @@ def test_local_chroma_vector_store() -> None: msg = f"Unknown resource type: {resource_type}" raise ValueError(msg) + mock_ctx = MagicMock(spec=ResourceContext) + mock_ctx.get_resource = get_resource + vector_store = ChromaVectorStore( name="chroma_vector_store", embedding_model="mock_embeddings", collection="test_collection", - get_resource=get_resource, + resource_context=mock_ctx, ) vector_store.open() @@ -127,11 +133,14 @@ def test_collection_management() -> None: msg = f"Unknown resource type: {resource_type}" raise ValueError(msg) + mock_ctx = MagicMock(spec=ResourceContext) + mock_ctx.get_resource = get_resource + vector_store = ChromaVectorStore( name="chroma_vector_store", embedding_model="mock_embeddings", collection="test_collection", - get_resource=get_resource, + resource_context=mock_ctx, ) vector_store.open() @@ -159,11 +168,14 @@ def test_document_management() -> None: msg = f"Unknown resource type: {resource_type}" raise ValueError(msg) + mock_ctx = MagicMock(spec=ResourceContext) + mock_ctx.get_resource = get_resource + vector_store = ChromaVectorStore( name="chroma_vector_store", embedding_model="mock_embeddings", collection="test_collection", - get_resource=get_resource, + resource_context=mock_ctx, ) vector_store.open() @@ -261,11 +273,14 @@ def _filter_test_store() -> ChromaVectorStore: msg = f"Unknown resource type: {resource_type}" raise ValueError(msg) + mock_ctx = MagicMock(spec=ResourceContext) + mock_ctx.get_resource = get_resource + vector_store = ChromaVectorStore( name="chroma_vector_store", embedding_model="mock_embeddings", collection="filter_tests", - get_resource=get_resource, + resource_context=mock_ctx, ) vector_store.open() vector_store.create_collection_if_not_exists(name="filter_tests") @@ -436,6 +451,9 @@ def test_cloud_chroma_vector_store() -> None: msg = f"Unknown resource type: {resource_type}" raise ValueError(msg) + mock_ctx = MagicMock(spec=ResourceContext) + mock_ctx.get_resource = get_resource + vector_store = ChromaVectorStore( name="chroma_vector_store", embedding_model="mock_embeddings", @@ -443,7 +461,7 @@ def test_cloud_chroma_vector_store() -> None: api_key=api_key, tenant=tenant, database=database, - get_resource=get_resource, + resource_context=mock_ctx, ) vector_store.open() diff --git a/python/flink_agents/plan/agent_plan.py b/python/flink_agents/plan/agent_plan.py index b737cb0e..40490e45 100644 --- a/python/flink_agents/plan/agent_plan.py +++ b/python/flink_agents/plan/agent_plan.py @@ -20,7 +20,12 @@ from typing import TYPE_CHECKING, Any, Dict, List, cast from pydantic import BaseModel, field_serializer, model_validator from flink_agents.api.agents.agent import Agent -from flink_agents.api.resource import ResourceDescriptor, ResourceType +from flink_agents.api.resource import ( + Resource, + ResourceDescriptor, + ResourceType, +) +from flink_agents.api.resource_context import ResourceContext from flink_agents.plan.actions.action import Action from flink_agents.plan.actions.chat_model_action import CHAT_MODEL_ACTION from flink_agents.plan.actions.context_retrieval_action import CONTEXT_RETRIEVAL_ACTION @@ -351,11 +356,14 @@ def _add_mcp_server( resource_providers.append(provider) - def get_resource(name: str, type: ResourceType) -> Any: + class ResourceContextPlaceholder(ResourceContext): """Placeholder - MCP server construction doesn't need resource resolution.""" + + def get_resource(self, name: str, resource_type: "ResourceType") -> "Resource": + pass mcp_server = cast( - "MCPServer", provider.provide(get_resource=get_resource, config=config) + "MCPServer", provider.provide(resource_context=ResourceContextPlaceholder(), config=config) ) resource_providers.extend( diff --git a/python/flink_agents/plan/resource_provider.py b/python/flink_agents/plan/resource_provider.py index d8e00e68..360e5fda 100644 --- a/python/flink_agents/plan/resource_provider.py +++ b/python/flink_agents/plan/resource_provider.py @@ -17,7 +17,6 @@ ################################################################################# import importlib from abc import ABC, abstractmethod -from collections.abc import Callable from typing import Any, Dict from pydantic import BaseModel, Field @@ -29,6 +28,7 @@ from flink_agents.api.resource import ( SerializableResource, get_resource_class, ) +from flink_agents.api.resource_context import ResourceContext from flink_agents.plan.configuration import AgentConfiguration @@ -48,13 +48,15 @@ class ResourceProvider(BaseModel, ABC): type: ResourceType @abstractmethod - def provide(self, get_resource: Callable, config: AgentConfiguration) -> Resource: + def provide( + self, resource_context: ResourceContext, config: AgentConfiguration + ) -> Resource: """Create resource in runtime. Parameters ---------- - get_resource : Callable - The helper function to get other resource declared in the same Agent. + resource_context : ResourceContext + The context for the resource. config : AgentConfiguration Configuration for Flink Agents. @@ -102,7 +104,9 @@ class PythonResourceProvider(ResourceProvider): descriptor=descriptor, ) - def provide(self, get_resource: Callable, config: AgentConfiguration) -> Resource: + def provide( + self, resource_context: ResourceContext, config: AgentConfiguration + ) -> Resource: """Create resource in runtime.""" cls = self.descriptor.clazz @@ -113,7 +117,7 @@ class PythonResourceProvider(ResourceProvider): final_kwargs.update(resource_class_config) final_kwargs.update(self.descriptor.arguments) - resource = cls(**final_kwargs, get_resource=get_resource) + resource = cls(**final_kwargs, resource_context=resource_context) return resource @@ -145,11 +149,14 @@ class PythonSerializableResourceProvider(SerializableResourceProvider): resource=resource, ) - def provide(self, get_resource: Callable, config: AgentConfiguration) -> Resource: + def provide( + self, resource_context: ResourceContext, config: AgentConfiguration + ) -> Resource: """Get or deserialize resource in runtime.""" if self.resource is None: module = importlib.import_module(self.module) clazz = getattr(module, self.clazz) + self.serialized["resource_context"] = resource_context self.resource = clazz.model_validate(self.serialized) return self.resource @@ -190,7 +197,9 @@ class JavaResourceProvider(ResourceProvider): descriptor=descriptor, ) - def provide(self, get_resource: Callable, config: AgentConfiguration) -> Resource: + def provide( + self, resource_context: ResourceContext, config: AgentConfiguration + ) -> Resource: """Create resource in runtime.""" if not self._j_resource_adapter: err_msg = "java resource adapter is not set" @@ -207,7 +216,7 @@ class JavaResourceProvider(ResourceProvider): return cls( **kwargs, - get_resource=get_resource, + resource_context=resource_context, j_resource=j_resource, j_resource_adapter=self._j_resource_adapter, ) @@ -224,7 +233,9 @@ class JavaSerializableResourceProvider(SerializableResourceProvider): Currently, this class only used for deserializing Java agent plan json """ - def provide(self, get_resource: Callable, config: AgentConfiguration) -> Resource: + def provide( + self, resource_context: ResourceContext, config: AgentConfiguration + ) -> Resource: """Get or deserialize resource in runtime.""" err_msg = ( "Currently, flink-agents doesn't support create resource " diff --git a/python/flink_agents/plan/tests/actions/test_chat_model_action.py b/python/flink_agents/plan/tests/actions/test_chat_model_action.py index 45f5944e..72b223d2 100644 --- a/python/flink_agents/plan/tests/actions/test_chat_model_action.py +++ b/python/flink_agents/plan/tests/actions/test_chat_model_action.py @@ -19,36 +19,36 @@ from flink_agents.plan.actions.chat_model_action import _clean_llm_response def test_clean_llm_response_with_json_block(): - input_str = "```json\n{\"key\": \"value\"}\n```" - expected = "{\"key\": \"value\"}" + input_str = '```json\n{"key": "value"}\n```' + expected = '{"key": "value"}' assert _clean_llm_response(input_str) == expected def test_clean_llm_response_with_generic_code_block(): - input_str = "```\n{\"key\": \"value\"}\n```" - expected = "{\"key\": \"value\"}" + input_str = '```\n{"key": "value"}\n```' + expected = '{"key": "value"}' assert _clean_llm_response(input_str) == expected def test_clean_llm_response_with_whitespace(): - input_str = " ```json\n{\"key\": \"value\"}\n``` " - expected = "{\"key\": \"value\"}" + input_str = ' ```json\n{"key": "value"}\n``` ' + expected = '{"key": "value"}' assert _clean_llm_response(input_str) == expected def test_clean_llm_response_without_block(): - input_str = "{\"key\": \"value\"}" - expected = "{\"key\": \"value\"}" + input_str = '{"key": "value"}' + expected = '{"key": "value"}' assert _clean_llm_response(input_str) == expected def test_clean_llm_response_with_text_around(): - input_str = "Here is the json: ```json {\"key\": \"value\"} ```" - expected = "Here is the json: ```json {\"key\": \"value\"} ```" + input_str = 'Here is the json: ```json {"key": "value"} ```' + expected = 'Here is the json: ```json {"key": "value"} ```' assert _clean_llm_response(input_str) == expected def test_clean_llm_response_with_multiple_lines_in_block(): - input_str = "```json\n{\n \"key\": \"value\"\n}\n```" - expected = "{\n \"key\": \"value\"\n}" + input_str = '```json\n{\n "key": "value"\n}\n```' + expected = '{\n "key": "value"\n}' assert _clean_llm_response(input_str) == expected diff --git a/python/flink_agents/runtime/flink_runner_context.py b/python/flink_agents/runtime/flink_runner_context.py index ff5fa58e..56b39a19 100644 --- a/python/flink_agents/runtime/flink_runner_context.py +++ b/python/flink_agents/runtime/flink_runner_context.py @@ -53,6 +53,7 @@ from flink_agents.runtime.memory.mem0.mem0_long_term_memory import ( ) from flink_agents.runtime.python_java_utils import _build_event_log_string from flink_agents.runtime.resource_cache import ResourceCache +from flink_agents.runtime.resource_context import ResourceContextImpl logger = logging.getLogger(__name__) @@ -274,6 +275,9 @@ class FlinkRunnerContext(RunnerContext): self.__agent_plan.resource_providers, self.__agent_plan.config ) self.__resource_cache.set_java_resource_adapter(j_resource_adapter) + self.__resource_cache.set_resource_context( + ResourceContextImpl(self.__resource_cache) + ) self.executor = executor def set_long_term_memory(self, ltm: InternalBaseLongTermMemory) -> None: diff --git a/python/flink_agents/runtime/java/java_resource_wrapper.py b/python/flink_agents/runtime/java/java_resource_wrapper.py index 8b2a9296..579e9141 100644 --- a/python/flink_agents/runtime/java/java_resource_wrapper.py +++ b/python/flink_agents/runtime/java/java_resource_wrapper.py @@ -23,6 +23,7 @@ from typing_extensions import override from flink_agents.api.chat_message import ChatMessage, MessageRole from flink_agents.api.prompts.prompt import Prompt from flink_agents.api.resource import Resource, ResourceType +from flink_agents.api.resource_context import ResourceContext from flink_agents.api.tools.tool import Tool, ToolType @@ -78,13 +79,14 @@ class JavaPrompt(Prompt): self.j_prompt.close() -class JavaGetResourceWrapper: +class JavaResourceContextWrapper(ResourceContext): """Python wrapper for Java ResourceAdapter.""" def __init__(self, j_resource_adapter: Any) -> None: """Initialize with a Java ResourceAdapter.""" self._j_resource_adapter = j_resource_adapter + @override def get_resource(self, name: str, type: ResourceType) -> Resource: """Get a resource by name and type.""" return self._j_resource_adapter.getResource(name, type.value) diff --git a/python/flink_agents/runtime/local_runner.py b/python/flink_agents/runtime/local_runner.py index 84402e44..c597e84f 100644 --- a/python/flink_agents/runtime/local_runner.py +++ b/python/flink_agents/runtime/local_runner.py @@ -35,6 +35,7 @@ from flink_agents.plan.configuration import AgentConfiguration from flink_agents.runtime.agent_runner import AgentRunner from flink_agents.runtime.local_memory_object import LocalMemoryObject from flink_agents.runtime.resource_cache import ResourceCache +from flink_agents.runtime.resource_context import ResourceContextImpl if TYPE_CHECKING: from flink_agents.plan.agent_plan import AgentPlan @@ -87,6 +88,9 @@ class LocalRunnerContext(RunnerContext): self.__resource_cache = ResourceCache( agent_plan.resource_providers, agent_plan.config ) + self.__resource_cache.set_resource_context( + ResourceContextImpl(self.__resource_cache) + ) self.__key = key self.events = deque() self._sensory_mem_store = {} diff --git a/python/flink_agents/runtime/python_java_utils.py b/python/flink_agents/runtime/python_java_utils.py index ae30a539..169ebf02 100644 --- a/python/flink_agents/runtime/python_java_utils.py +++ b/python/flink_agents/runtime/python_java_utils.py @@ -18,7 +18,7 @@ import importlib import json import typing -from typing import Any, Callable, Dict +from typing import Any, Dict import cloudpickle @@ -38,8 +38,8 @@ from flink_agents.api.vector_stores.vector_store import ( ) from flink_agents.plan.resource_provider import JAVA_RESOURCE_MAPPING from flink_agents.runtime.java.java_resource_wrapper import ( - JavaGetResourceWrapper, JavaPrompt, + JavaResourceContextWrapper, JavaTool, ) @@ -93,16 +93,17 @@ def create_resource( return cls(**func_kwargs) -def get_resource_function(j_resource_adapter: Any) -> Callable: - """Create a callable wrapper for Java resource adapter. +def get_resource_context(j_resource_adapter: Any) -> JavaResourceContextWrapper: + """Create a ResourceContext wrapper for Java resource adapter. Args: j_resource_adapter: Java resource adapter object Returns: - Callable: A Python callable that wraps the Java resource adapter + JavaResourceContextWrapper: A ResourceContext that wraps the + Java resource adapter """ - return JavaGetResourceWrapper(j_resource_adapter).get_resource + return JavaResourceContextWrapper(j_resource_adapter) def from_java_tool(j_tool: Any) -> JavaTool: diff --git a/python/flink_agents/runtime/resource_cache.py b/python/flink_agents/runtime/resource_cache.py index cc4d32a7..0e7ded88 100644 --- a/python/flink_agents/runtime/resource_cache.py +++ b/python/flink_agents/runtime/resource_cache.py @@ -18,6 +18,7 @@ from typing import Any, Dict from flink_agents.api.resource import Resource, ResourceType +from flink_agents.api.resource_context import ResourceContext from flink_agents.plan.configuration import AgentConfiguration from flink_agents.plan.resource_provider import JavaResourceProvider, ResourceProvider @@ -51,6 +52,11 @@ class ResourceCache: self._config = config self._cache: Dict[ResourceType, Dict[str, Resource]] = {} self._j_resource_adapter: Any = None + self._resource_context = None + + def set_resource_context(self, resource_context: ResourceContext) -> None: + """Set the resource context for accessing other resource in runtime.""" + self._resource_context = resource_context def set_java_resource_adapter(self, j_resource_adapter: Any) -> None: """Set Java resource adapter for Java resource providers.""" @@ -77,7 +83,7 @@ class ResourceCache: if isinstance(resource_provider, JavaResourceProvider): resource_provider.set_java_resource_adapter(self._j_resource_adapter) resource = resource_provider.provide( - get_resource=self.get_resource, config=self._config + resource_context=self._resource_context, config=self._config ) resource.open() self._cache.setdefault(type, {})[name] = resource diff --git a/python/flink_agents/runtime/resource_context.py b/python/flink_agents/runtime/resource_context.py new file mode 100644 index 00000000..da6a68c9 --- /dev/null +++ b/python/flink_agents/runtime/resource_context.py @@ -0,0 +1,51 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +"""Runtime implementation of ResourceContext.""" +from __future__ import annotations + +from typing import TYPE_CHECKING, cast + +from flink_agents.api.resource import ResourceType +from flink_agents.api.resource_context import ResourceContext +from flink_agents.plan.agent_plan import SKILLS_CONFIG +from flink_agents.runtime.skill.skill_manager import SkillManager + +if TYPE_CHECKING: + from flink_agents.api.resource import Resource + from flink_agents.api.skills import Skills + from flink_agents.runtime.resource_cache import ResourceCache + + +class ResourceContextImpl(ResourceContext): + """Concrete ResourceContext backed by AgentPlan. + + Creates the SkillManager lazily from the Skills config resource. + ``get_skill_manager()`` is only visible to runtime-internal code + (e.g., skill tools); it is NOT part of the public + :class:`ResourceContext` interface. + """ + + def __init__(self, resource_cache: ResourceCache) -> None: + """Initialize with the backing AgentPlan.""" + self._resource_cache = resource_cache + self._skill_manager: SkillManager | None = None + self._skill_manager_initialized = False + + def get_resource(self, name: str, resource_type: ResourceType) -> Resource: + """Get another resource declared in the same Agent.""" + return self._resource_cache.get_resource(name, resource_type) diff --git a/python/flink_agents/runtime/tests/test_built_in_actions.py b/python/flink_agents/runtime/tests/test_built_in_actions.py index 07258a35..52dc6896 100644 --- a/python/flink_agents/runtime/tests/test_built_in_actions.py +++ b/python/flink_agents/runtime/tests/test_built_in_actions.py @@ -84,13 +84,17 @@ class MockChatModel(BaseChatModelSetup): def chat(self, messages: Sequence[ChatMessage], **kwargs: Any) -> ChatMessage: """Execute chat conversation.""" # Get model connection - server = self.get_resource(self.connection, ResourceType.CHAT_MODEL_CONNECTION) + server = self.resource_context.get_resource( + self.connection, ResourceType.CHAT_MODEL_CONNECTION + ) # Apply prompt template if self.prompt is not None: if isinstance(self.prompt, str): # Get prompt resource if it's a string - prompt = self.get_resource(self.prompt, ResourceType.PROMPT) + prompt = self.resource_context.get_resource( + self.prompt, ResourceType.PROMPT + ) else: prompt = self.prompt @@ -106,7 +110,7 @@ class MockChatModel(BaseChatModelSetup): tools = None if self.tools is not None: tools = [ - self.get_resource(tool_name, ResourceType.TOOL) + self.resource_context.get_resource(tool_name, ResourceType.TOOL) for tool_name in self.tools ] diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java index 6c2c8757..442045ae 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java @@ -46,11 +46,11 @@ public class PythonResourceAdapterImpl implements PythonResourceAdapter { static final String JAVA_RESOURCE_ADAPTER = "j_resource_adapter"; - static final String GET_RESOURCE_KEY = "get_resource"; + static final String RESOURCE_CONTEXT_KEY = "resource_context"; static final String PYTHON_MODULE_PREFIX = "python_java_utils."; - static final String GET_RESOURCE_FUNCTION = PYTHON_MODULE_PREFIX + "get_resource_function"; + static final String GET_RESOURCE_CONTEXT = PYTHON_MODULE_PREFIX + "get_resource_context"; static final String CALL_METHOD = PYTHON_MODULE_PREFIX + "call_method"; @@ -76,7 +76,7 @@ public class PythonResourceAdapterImpl implements PythonResourceAdapter { private final BiFunction<String, ResourceType, Resource> getResource; private final PythonInterpreter interpreter; private final JavaResourceAdapter javaResourceAdapter; - private Object pythonGetResourceFunction; + private PyObject pythonResourceContext; public PythonResourceAdapterImpl( BiFunction<String, ResourceType, Resource> getResource, @@ -89,7 +89,7 @@ public class PythonResourceAdapterImpl implements PythonResourceAdapter { public void open() { interpreter.exec(PYTHON_IMPORTS); - pythonGetResourceFunction = interpreter.invoke(GET_RESOURCE_FUNCTION, this); + pythonResourceContext = (PyObject) interpreter.invoke(GET_RESOURCE_CONTEXT, this); } public Object getResource(String resourceName, String resourceType) { @@ -112,13 +112,13 @@ public class PythonResourceAdapterImpl implements PythonResourceAdapter { Map<String, Object> kwargs = new HashMap<>(); kwargs.put(JAVA_RESOURCE, resource); kwargs.put(JAVA_RESOURCE_ADAPTER, javaResourceAdapter); - kwargs.put(GET_RESOURCE_KEY, pythonGetResourceFunction); + kwargs.put(RESOURCE_CONTEXT_KEY, pythonResourceContext); return interpreter.invoke(FROM_JAVA_RESOURCE, resourceType, kwargs); } @Override public PyObject initPythonResource(String module, String clazz, Map<String, Object> kwargs) { - kwargs.put(GET_RESOURCE_KEY, pythonGetResourceFunction); + kwargs.put(RESOURCE_CONTEXT_KEY, pythonResourceContext); return (PyObject) interpreter.invoke(CREATE_RESOURCE, module, clazz, kwargs); } diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImplTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImplTest.java index 282c2ae1..1aba02f1 100644 --- a/runtime/src/test/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImplTest.java +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImplTest.java @@ -75,7 +75,7 @@ public class PythonResourceAdapterImplTest { PyObject result = pythonResourceAdapter.initPythonResource(module, clazz, kwargs); assertThat(result).isEqualTo(expectedResult); - assertThat(kwargs).containsKey(PythonResourceAdapterImpl.GET_RESOURCE_KEY); + assertThat(kwargs).containsKey(PythonResourceAdapterImpl.RESOURCE_CONTEXT_KEY); verify(mockInterpreter) .invoke(PythonResourceAdapterImpl.CREATE_RESOURCE, module, clazz, kwargs); } @@ -87,7 +87,7 @@ public class PythonResourceAdapterImplTest { verify(mockInterpreter).exec(PythonResourceAdapterImpl.PYTHON_IMPORTS); verify(mockInterpreter) - .invoke(PythonResourceAdapterImpl.GET_RESOURCE_FUNCTION, pythonResourceAdapter); + .invoke(PythonResourceAdapterImpl.GET_RESOURCE_CONTEXT, pythonResourceAdapter); } @Test
