This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git

commit c9905019a4e0e08f924aba90a1405d4bf3926d84
Author: youjin <[email protected]>
AuthorDate: Mon Jan 12 19:12:24 2026 +0800

    [feature] Support Python resource retrieval through the Java resource using 
the getResource function
---
 .../resourceprovider/PythonResourceProvider.java   | 39 +++++++++++---
 .../apache/flink/agents/plan/AgentPlanTest.java    |  3 +-
 .../chat_model_cross_language_agent.py             | 17 ++++--
 .../runtime/operator/ActionExecutionOperator.java  | 21 ++++++--
 .../runtime/python/utils/JavaResourceAdapter.java  | 61 +++-------------------
 5 files changed, 71 insertions(+), 70 deletions(-)

diff --git 
a/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/PythonResourceProvider.java
 
b/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/PythonResourceProvider.java
index f70833f..9f77676 100644
--- 
a/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/PythonResourceProvider.java
+++ 
b/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/PythonResourceProvider.java
@@ -18,6 +18,10 @@
 
 package org.apache.flink.agents.plan.resourceprovider;
 
+import org.apache.flink.agents.api.chat.model.python.PythonChatModelConnection;
+import org.apache.flink.agents.api.chat.model.python.PythonChatModelSetup;
+import 
org.apache.flink.agents.api.embedding.model.python.PythonEmbeddingModelConnection;
+import 
org.apache.flink.agents.api.embedding.model.python.PythonEmbeddingModelSetup;
 import org.apache.flink.agents.api.resource.Resource;
 import org.apache.flink.agents.api.resource.ResourceDescriptor;
 import org.apache.flink.agents.api.resource.ResourceType;
@@ -26,6 +30,7 @@ import pemja.core.object.PyObject;
 
 import java.lang.reflect.Constructor;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 import java.util.function.BiFunction;
 
@@ -40,6 +45,13 @@ import static org.apache.flink.util.Preconditions.checkState;
 public class PythonResourceProvider extends ResourceProvider {
     private final ResourceDescriptor descriptor;
 
+    private static final Map<ResourceType, Class<?>> RESOURCE_TYPE_TO_CLASS =
+            Map.of(
+                    ResourceType.CHAT_MODEL, PythonChatModelSetup.class,
+                    ResourceType.CHAT_MODEL_CONNECTION, 
PythonChatModelConnection.class,
+                    ResourceType.EMBEDDING_MODEL, 
PythonEmbeddingModelSetup.class,
+                    ResourceType.EMBEDDING_MODEL_CONNECTION, 
PythonEmbeddingModelConnection.class);
+
     protected PythonResourceAdapter pythonResourceAdapter;
 
     public PythonResourceProvider(String name, ResourceType type, 
ResourceDescriptor descriptor) {
@@ -59,17 +71,30 @@ public class PythonResourceProvider extends 
ResourceProvider {
     public Resource provide(BiFunction<String, ResourceType, Resource> 
getResource)
             throws Exception {
         checkState(pythonResourceAdapter != null, "PythonResourceAdapter is 
not set");
-        Class<?> clazz = Class.forName(descriptor.getClazz());
+
+        Class<?> clazz = RESOURCE_TYPE_TO_CLASS.get(getType());
+        if (clazz == null) {
+            throw new UnsupportedOperationException(
+                    "Unsupported python resource type: " + getType());
+        }
 
         HashMap<String, Object> kwargs = new 
HashMap<>(descriptor.getInitialArguments());
-        String pyModule = (String) kwargs.remove("module");
+        String pyModule = descriptor.getModule();
+        String pyClazz = descriptor.getClazz();
+
+        // Extract module and class from kwargs if not provided in descriptor
         if (pyModule == null || pyModule.isEmpty()) {
-            throw new IllegalArgumentException("module should not be null or 
empty.");
-        }
-        String pyClazz = (String) kwargs.remove("clazz");
-        if (pyClazz == null || pyClazz.isEmpty()) {
-            throw new IllegalArgumentException("clazz should not be null or 
empty.");
+            pyModule = (String) kwargs.remove("module");
+            if (pyModule == null || pyModule.isEmpty()) {
+                throw new IllegalArgumentException("module should not be null 
or empty.");
+            }
+
+            pyClazz = (String) kwargs.remove("clazz");
+            if (pyClazz == null || pyClazz.isEmpty()) {
+                throw new IllegalArgumentException("clazz should not be null 
or empty.");
+            }
         }
+
         PyObject pyResource = 
pythonResourceAdapter.initPythonResource(pyModule, pyClazz, kwargs);
         Constructor<?> constructor =
                 clazz.getConstructor(
diff --git a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java 
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
index 9179ea2..ae95f40 100644
--- a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
+++ b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.agents.api.agents.Agent;
 import org.apache.flink.agents.api.annotation.ChatModelSetup;
 import org.apache.flink.agents.api.annotation.Tool;
 import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.model.python.PythonChatModelSetup;
 import org.apache.flink.agents.api.context.RunnerContext;
 import org.apache.flink.agents.api.resource.Resource;
 import org.apache.flink.agents.api.resource.ResourceDescriptor;
@@ -440,7 +441,7 @@ public class AgentPlanTest {
         Resource pythonChatModel =
                 agentPlan.getResource("pythonChatModel", 
ResourceType.CHAT_MODEL);
         assertThat(pythonChatModel).isNotNull();
-        assertThat(pythonChatModel).isInstanceOf(TestPythonResource.class);
+        assertThat(pythonChatModel).isInstanceOf(PythonChatModelSetup.class);
         
assertThat(pythonChatModel.getResourceType()).isEqualTo(ResourceType.CHAT_MODEL);
 
         // Test that resources are cached (should be the same instance)
diff --git 
a/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/chat_model_cross_language_agent.py
 
b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/chat_model_cross_language_agent.py
index 796ab80..929500f 100644
--- 
a/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/chat_model_cross_language_agent.py
+++ 
b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/chat_model_cross_language_agent.py
@@ -35,6 +35,9 @@ from flink_agents.api.events.event import InputEvent, 
OutputEvent
 from flink_agents.api.prompts.prompt import Prompt
 from flink_agents.api.resource import ResourceDescriptor
 from flink_agents.api.runner_context import RunnerContext
+from flink_agents.integrations.chat_models.ollama_chat_model import (
+    OllamaChatModelConnection,
+)
 
 
 class ChatModelCrossLanguageAgent(Agent):
@@ -67,7 +70,15 @@ class ChatModelCrossLanguageAgent(Agent):
 
     @chat_model_connection
     @staticmethod
-    def ollama_connection() -> ResourceDescriptor:
+    def ollama_connection_python() -> ResourceDescriptor:
+        """ChatModelConnection responsible for ollama model service 
connection."""
+        return ResourceDescriptor(
+            clazz=OllamaChatModelConnection, request_timeout=240.0
+        )
+
+    @chat_model_connection
+    @staticmethod
+    def ollama_connection_java() -> ResourceDescriptor:
         """ChatModelConnection responsible for ollama model service 
connection."""
         return ResourceDescriptor(
             clazz=JavaChatModelConnection,
@@ -82,7 +93,7 @@ class ChatModelCrossLanguageAgent(Agent):
         """ChatModel which focus on math, and reuse ChatModelConnection."""
         return ResourceDescriptor(
             clazz=JavaChatModelSetup,
-            connection="ollama_connection",
+            connection="ollama_connection_python",
             
java_clazz="org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelSetup",
             model=os.environ.get("OLLAMA_CHAT_MODEL", "qwen3:1.7b"),
             prompt="from_messages_prompt",
@@ -96,7 +107,7 @@ class ChatModelCrossLanguageAgent(Agent):
         """ChatModel which focus on text generate, and reuse 
ChatModelConnection."""
         return ResourceDescriptor(
             clazz=JavaChatModelSetup,
-            connection="ollama_connection",
+            connection="ollama_connection_java",
             
java_clazz="org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelSetup",
             model=os.environ.get("OLLAMA_CHAT_MODEL", "qwen3:1.7b"),
             prompt="from_text_prompt",
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index d44d0b8..6b12a6d 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -28,6 +28,7 @@ import org.apache.flink.agents.api.logger.EventLogger;
 import org.apache.flink.agents.api.logger.EventLoggerConfig;
 import org.apache.flink.agents.api.logger.EventLoggerFactory;
 import org.apache.flink.agents.api.logger.EventLoggerOpenParams;
+import org.apache.flink.agents.api.resource.Resource;
 import org.apache.flink.agents.api.resource.ResourceType;
 import org.apache.flink.agents.plan.AgentPlan;
 import org.apache.flink.agents.plan.JavaFunction;
@@ -142,6 +143,9 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
     // PythonResourceAdapter for Python resources in Java actions
     private transient PythonResourceAdapterImpl pythonResourceAdapter;
 
+    // PythonResourceAdapter for Java resources in Python actions or Python 
resources
+    private transient JavaResourceAdapter javaResourceAdapter;
+
     private transient FlinkAgentsMetricGroupImpl metricGroup;
 
     private transient BuiltInMetrics builtInMetrics;
@@ -539,6 +543,14 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
         }
     }
 
+    private Resource getResource(String name, ResourceType type) {
+        try {
+            return agentPlan.getResource(name, type);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     private void initPythonEnvironment() throws Exception {
         boolean containPythonAction =
                 agentPlan.getActions().values().stream()
@@ -576,17 +588,18 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
             pythonRunnerContext =
                     new PythonRunnerContextImpl(
                             this.metricGroup, this::checkMailboxThread, 
this.agentPlan);
+
+            javaResourceAdapter = new JavaResourceAdapter(this::getResource, 
pythonInterpreter);
+            if (containPythonResource) {
+                initPythonResourceAdapter();
+            }
             if (containPythonAction) {
                 initPythonActionExecutor();
-            } else {
-                initPythonResourceAdapter();
             }
         }
     }
 
     private void initPythonActionExecutor() throws Exception {
-        JavaResourceAdapter javaResourceAdapter =
-                new JavaResourceAdapter(agentPlan, pythonInterpreter);
         pythonActionExecutor =
                 new PythonActionExecutor(
                         pythonInterpreter,
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/JavaResourceAdapter.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/JavaResourceAdapter.java
index fcdc6dc..7b6c009 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/JavaResourceAdapter.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/JavaResourceAdapter.java
@@ -21,29 +21,20 @@ import 
org.apache.flink.agents.api.chat.messages.ChatMessage;
 import org.apache.flink.agents.api.chat.messages.MessageRole;
 import org.apache.flink.agents.api.resource.Resource;
 import org.apache.flink.agents.api.resource.ResourceType;
-import org.apache.flink.agents.plan.AgentPlan;
-import org.apache.flink.agents.plan.resourceprovider.PythonResourceProvider;
-import org.apache.flink.agents.plan.resourceprovider.ResourceProvider;
 import pemja.core.PythonInterpreter;
 
-import javax.naming.OperationNotSupportedException;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
 
 /** Adapter for managing Java resources and facilitating Python-Java 
interoperability. */
 public class JavaResourceAdapter {
-    private final Map<ResourceType, Map<String, ResourceProvider>> 
resourceProviders;
+    private final BiFunction<String, ResourceType, Resource> getResource;
 
     private final transient PythonInterpreter interpreter;
 
-    /** Cache for instantiated resources. */
-    private final transient Map<ResourceType, Map<String, Resource>> 
resourceCache;
-
-    public JavaResourceAdapter(AgentPlan agentPlan, PythonInterpreter 
interpreter) {
-        this.resourceProviders = agentPlan.getResourceProviders();
+    public JavaResourceAdapter(
+            BiFunction<String, ResourceType, Resource> getResource, 
PythonInterpreter interpreter) {
+        this.getResource = getResource;
         this.interpreter = interpreter;
-        this.resourceCache = new ConcurrentHashMap<>();
     }
 
     /**
@@ -56,47 +47,7 @@ public class JavaResourceAdapter {
      * @throws Exception if the resource cannot be retrieved
      */
     public Resource getResource(String name, String typeValue) throws 
Exception {
-        return getResource(name, ResourceType.fromValue(typeValue));
-    }
-
-    /**
-     * Retrieves a Java resource by name and type.
-     *
-     * @param name the name of the resource to retrieve
-     * @param type the type of the resource
-     * @return the resource
-     * @throws Exception if the resource cannot be retrieved
-     */
-    public Resource getResource(String name, ResourceType type) throws 
Exception {
-        if (resourceCache.containsKey(type) && 
resourceCache.get(type).containsKey(name)) {
-            return resourceCache.get(type).get(name);
-        }
-
-        if (!resourceProviders.containsKey(type)
-                || !resourceProviders.get(type).containsKey(name)) {
-            throw new IllegalArgumentException("Resource not found: " + name + 
" of type " + type);
-        }
-
-        ResourceProvider provider = resourceProviders.get(type).get(name);
-        if (provider instanceof PythonResourceProvider) {
-            // TODO: Support getting resources from PythonResourceProvider in 
JavaResourceAdapter.
-            throw new OperationNotSupportedException("PythonResourceProvider 
is not supported.");
-        }
-
-        Resource resource =
-                provider.provide(
-                        (String anotherName, ResourceType anotherType) -> {
-                            try {
-                                return this.getResource(anotherName, 
anotherType);
-                            } catch (Exception e) {
-                                throw new RuntimeException(e);
-                            }
-                        });
-
-        // Cache the resource
-        resourceCache.computeIfAbsent(type, k -> new 
ConcurrentHashMap<>()).put(name, resource);
-
-        return resource;
+        return getResource.apply(name, ResourceType.fromValue(typeValue));
     }
 
     /**

Reply via email to