This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-agents.git
commit 01362a6534ed2e73c7b730cc62c7c87018e12348 Author: youjin <[email protected]> AuthorDate: Mon Jan 12 14:03:51 2026 +0800 [feature] Support Java resource retrieval through the Python resource using the get_resource function --- .../pom.xml | 5 +++ .../resource/test/ChatModelCrossLanguageAgent.java | 39 ++++++++++++++++++---- python/flink_agents/runtime/python_java_utils.py | 24 ++++++++++++- .../runtime/operator/ActionExecutionOperator.java | 3 +- .../python/utils/PythonResourceAdapterImpl.java | 35 +++++++++++++++---- .../utils/PythonResourceAdapterImplTest.java | 11 +++--- 6 files changed, 99 insertions(+), 18 deletions(-) diff --git a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml index c93c37f..f06b302 100644 --- a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml +++ b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml @@ -25,6 +25,11 @@ <artifactId>flink-agents-runtime</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-agents-integrations-chat-models-ollama</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> diff --git a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/ChatModelCrossLanguageAgent.java b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/ChatModelCrossLanguageAgent.java index 251492e..6712ee5 100644 --- a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/ChatModelCrossLanguageAgent.java +++ b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/ChatModelCrossLanguageAgent.java @@ -35,6 +35,7 @@ import org.apache.flink.agents.api.event.ChatRequestEvent; import org.apache.flink.agents.api.event.ChatResponseEvent; import org.apache.flink.agents.api.resource.ResourceDescriptor; import org.apache.flink.agents.api.resource.ResourceType; +import org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelConnection; import java.util.Collections; import java.util.List; @@ -64,7 +65,15 @@ public class ChatModelCrossLanguageAgent extends Agent { public static final String OLLAMA_MODEL = "qwen3:0.6b"; @ChatModelConnection - public static ResourceDescriptor chatModelConnection() { + public static ResourceDescriptor javaChatModelConnection() { + return ResourceDescriptor.Builder.newBuilder(OllamaChatModelConnection.class.getName()) + .addInitialArgument("endpoint", "http://localhost:11434") + .addInitialArgument("requestTimeout", 240) + .build(); + } + + @ChatModelConnection + public static ResourceDescriptor pythonChatModelConnection() { return ResourceDescriptor.Builder.newBuilder(PythonChatModelConnection.class.getName()) .addInitialArgument( "module", "flink_agents.integrations.chat_models.ollama_chat_model") @@ -73,16 +82,27 @@ public class ChatModelCrossLanguageAgent extends Agent { } @ChatModelSetup - public static ResourceDescriptor chatModel() { + public static ResourceDescriptor temperatureChatModel() { return ResourceDescriptor.Builder.newBuilder(PythonChatModelSetup.class.getName()) .addInitialArgument( "module", "flink_agents.integrations.chat_models.ollama_chat_model") .addInitialArgument("clazz", "OllamaChatModelSetup") - .addInitialArgument("connection", "chatModelConnection") + .addInitialArgument("connection", "javaChatModelConnection") .addInitialArgument("model", OLLAMA_MODEL) + .addInitialArgument("tools", List.of("convertTemperature")) + .addInitialArgument("extract_reasoning", "true") + .build(); + } + + @ChatModelSetup + public static ResourceDescriptor chatModel() { + return ResourceDescriptor.Builder.newBuilder(PythonChatModelSetup.class.getName()) .addInitialArgument( - "tools", - List.of("calculateBMI", "convertTemperature", "createRandomNumber")) + "module", "flink_agents.integrations.chat_models.ollama_chat_model") + .addInitialArgument("clazz", "OllamaChatModelSetup") + .addInitialArgument("connection", "pythonChatModelConnection") + .addInitialArgument("model", OLLAMA_MODEL) + .addInitialArgument("tools", List.of("calculateBMI", "createRandomNumber")) .addInitialArgument("extract_reasoning", "true") .build(); } @@ -133,9 +153,16 @@ public class ChatModelCrossLanguageAgent extends Agent { @Action(listenEvents = {InputEvent.class}) public static void process(InputEvent event, RunnerContext ctx) throws Exception { + String model; + if (event.getInput().toString().contains("temperature") + || event.getInput().toString().contains("degree")) { + model = "temperatureChatModel"; + } else { + model = "chatModel"; + } ctx.sendEvent( new ChatRequestEvent( - "chatModel", + model, Collections.singletonList( new ChatMessage(MessageRole.USER, (String) event.getInput())))); } diff --git a/python/flink_agents/runtime/python_java_utils.py b/python/flink_agents/runtime/python_java_utils.py index 9debb3c..6a3a84e 100644 --- a/python/flink_agents/runtime/python_java_utils.py +++ b/python/flink_agents/runtime/python_java_utils.py @@ -23,9 +23,10 @@ from pemja import findClass from flink_agents.api.chat_message import ChatMessage, MessageRole from flink_agents.api.events.event import InputEvent -from flink_agents.api.resource import Resource +from flink_agents.api.resource import Resource, ResourceType, get_resource_class from flink_agents.api.tools.tool import ToolMetadata from flink_agents.api.tools.utils import create_model_from_java_tool_schema_str +from flink_agents.plan.resource_provider import JAVA_RESOURCE_MAPPING from flink_agents.runtime.java.java_resource_wrapper import ( JavaGetResourceWrapper, JavaPrompt, @@ -106,6 +107,27 @@ def from_java_prompt(j_prompt: Any) -> JavaPrompt: """ return JavaPrompt(j_prompt=j_prompt) +def from_java_resource(type_name: str, kwargs: Dict[str, Any]) -> Resource: + """Convert a Java resource object to a Python Resource instance. + This function is used to convert a Java resource object to a Python Resource + instance. + + Args: + type_name: Java resource type name + kwargs: Keyword arguments + Returns: + Resource: Python wrapper for the Java resource + """ + class_path = JAVA_RESOURCE_MAPPING.get(ResourceType(type_name)) + if not class_path: + err_msg = f"No Java resource mapping found for {type_name}" + raise ValueError(err_msg) + + module_path, class_name = class_path.rsplit(".", 1) + cls = get_resource_class(module_path, class_name) + + return cls(**kwargs) + def normalize_tool_call_id(tool_call: Dict[str, Any]) -> Dict[str, Any]: """Normalize tool call by converting the ID field to string format while preserving all other fields. diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java index 6b12a6d..084ad1f 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java @@ -620,7 +620,8 @@ public class ActionExecutionOperator<IN, OUT> extends AbstractStreamOperator<OUT throw new RuntimeException(e); } }, - pythonInterpreter); + pythonInterpreter, + javaResourceAdapter); pythonResourceAdapter.open(); agentPlan.setPythonResourceAdapter(pythonResourceAdapter); } diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java index cd9b935..b167e19 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java @@ -18,6 +18,7 @@ package org.apache.flink.agents.runtime.python.utils; import org.apache.flink.agents.api.chat.messages.ChatMessage; +import org.apache.flink.agents.api.chat.messages.MessageRole; import org.apache.flink.agents.api.prompt.Prompt; import org.apache.flink.agents.api.resource.Resource; import org.apache.flink.agents.api.resource.ResourceType; @@ -27,6 +28,7 @@ import org.apache.flink.agents.api.tools.Tool; import pemja.core.PythonInterpreter; import pemja.core.object.PyObject; +import java.util.HashMap; import java.util.Map; import java.util.function.BiFunction; @@ -34,6 +36,10 @@ public class PythonResourceAdapterImpl implements PythonResourceAdapter { static final String PYTHON_IMPORTS = "from flink_agents.runtime import python_java_utils"; + static final String JAVA_RESOURCE = "j_resource"; + + static final String JAVA_RESOURCE_ADAPTER = "j_resource_adapter"; + static final String GET_RESOURCE_KEY = "get_resource"; static final String PYTHON_MODULE_PREFIX = "python_java_utils."; @@ -44,22 +50,28 @@ public class PythonResourceAdapterImpl implements PythonResourceAdapter { static final String CREATE_RESOURCE = PYTHON_MODULE_PREFIX + "create_resource"; + static final String FROM_JAVA_RESOURCE = PYTHON_MODULE_PREFIX + "from_java_resource"; + static final String FROM_JAVA_TOOL = PYTHON_MODULE_PREFIX + "from_java_tool"; static final String FROM_JAVA_PROMPT = PYTHON_MODULE_PREFIX + "from_java_prompt"; static final String FROM_JAVA_CHAT_MESSAGE = PYTHON_MODULE_PREFIX + "from_java_chat_message"; - static final String TO_JAVA_CHAT_MESSAGE = PYTHON_MODULE_PREFIX + "to_java_chat_message"; + static final String TO_JAVA_CHAT_MESSAGE = PYTHON_MODULE_PREFIX + "update_java_chat_message"; private final BiFunction<String, ResourceType, Resource> getResource; private final PythonInterpreter interpreter; + private final JavaResourceAdapter javaResourceAdapter; private Object pythonGetResourceFunction; public PythonResourceAdapterImpl( - BiFunction<String, ResourceType, Resource> getResource, PythonInterpreter interpreter) { + BiFunction<String, ResourceType, Resource> getResource, + PythonInterpreter interpreter, + JavaResourceAdapter javaResourceAdapter) { this.getResource = getResource; this.interpreter = interpreter; + this.javaResourceAdapter = javaResourceAdapter; } public void open() { @@ -80,7 +92,15 @@ public class PythonResourceAdapterImpl implements PythonResourceAdapter { if (resource instanceof Prompt) { return convertToPythonPrompt((Prompt) resource); } - return resource; + return toPythonResource(resourceType, resource); + } + + private Object toPythonResource(String resourceType, Resource resource) { + Map<String, Object> kwargs = new HashMap<>(); + kwargs.put(JAVA_RESOURCE, resource); + kwargs.put(JAVA_RESOURCE_ADAPTER, javaResourceAdapter); + kwargs.put(GET_RESOURCE_KEY, pythonGetResourceFunction); + return interpreter.invoke(FROM_JAVA_RESOURCE, resourceType, kwargs); } @Override @@ -96,10 +116,13 @@ public class PythonResourceAdapterImpl implements PythonResourceAdapter { @Override public ChatMessage fromPythonChatMessage(Object pythonChatMessage) { - ChatMessage message = - (ChatMessage) interpreter.invoke(TO_JAVA_CHAT_MESSAGE, pythonChatMessage); + // TODO: Update this method after the pemja findClass method is fixed. + ChatMessage chatMessage = new ChatMessage(); - return message; + String roleValue = + (String) interpreter.invoke(TO_JAVA_CHAT_MESSAGE, pythonChatMessage, chatMessage); + chatMessage.setRole(MessageRole.fromValue(roleValue)); + return chatMessage; } @Override diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImplTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImplTest.java index 18ee02e..282c2ae 100644 --- a/runtime/src/test/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImplTest.java +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImplTest.java @@ -49,7 +49,7 @@ public class PythonResourceAdapterImplTest { @BeforeEach void setUp() throws Exception { mocks = MockitoAnnotations.openMocks(this); - pythonResourceAdapter = new PythonResourceAdapterImpl(getResource, mockInterpreter); + pythonResourceAdapter = new PythonResourceAdapterImpl(getResource, mockInterpreter, null); } @AfterEach @@ -152,10 +152,13 @@ public class PythonResourceAdapterImplTest { when(getResource.apply(resourceName, ResourceType.CHAT_MODEL)).thenReturn(mockResource); - Object result = pythonResourceAdapter.getResource(resourceName, resourceType); + pythonResourceAdapter.getResource(resourceName, resourceType); - assertThat(result).isEqualTo(mockResource); - verify(getResource).apply(resourceName, ResourceType.CHAT_MODEL); + verify(mockInterpreter) + .invoke( + eq(PythonResourceAdapterImpl.FROM_JAVA_RESOURCE), + eq(resourceType), + any(Map.class)); } @Test
