This is an automated email from the ASF dual-hosted git repository.

wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git

commit b1d65de3b23ef06ff0f7f2fde7a2aad9951f04fe
Author: WenjinXie <[email protected]>
AuthorDate: Sat May 16 20:34:01 2026 +0800

    [hotfix] Embed on Python side for cross-language vector store
    
    When a JavaCollectionManageableVectorStore wraps a Java vector store
    backed by a Python embedding model, the previous flow delegated query/
    add/update to the Java side, which then called the embedding model from
    Java. For a PYTHON-backed embedding model that callback is a Java→Python
    re-entry on the Flink operator thread, and pemja's sub-interpreter state
    on that thread ends up corrupted; the next pemja conversion in the same
    thread crashes inside JcpPyDecimal_Check → PyImport_ImportModule with a
    NULL sys.modules dereference.
    
    Drop the add/query/update overrides in JavaVectorStoreImpl and inherit
    BaseVectorStore's canonical flow: embed in Python first, then make a
    single-direction Python→Java call to addEmbedding/queryEmbedding/
    updateEmbedding. open() now also chains super().open() so the embedding
    model resolves from string to instance before the Python embed step
    runs. fromPythonVectorStoreQuery (its only caller is gone) and the
    matching python_java_utils helpers (get_mode_value,
    from_java_vector_store_query_result) are removed.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../flink_agents/runtime/java/java_vector_store.py | 48 ++++------------------
 python/flink_agents/runtime/python_java_utils.py   | 15 -------
 .../runtime/python/utils/JavaResourceAdapter.java  | 19 ---------
 3 files changed, 9 insertions(+), 73 deletions(-)

diff --git a/python/flink_agents/runtime/java/java_vector_store.py 
b/python/flink_agents/runtime/java/java_vector_store.py
index 923482df..76e54754 100644
--- a/python/flink_agents/runtime/java/java_vector_store.py
+++ b/python/flink_agents/runtime/java/java_vector_store.py
@@ -25,14 +25,9 @@ from flink_agents.api.vector_stores.java_vector_store import 
(
 )
 from flink_agents.api.vector_stores.vector_store import (
     Document,
-    VectorStoreQuery,
-    VectorStoreQueryResult,
     _maybe_cast_to_list,
 )
-from flink_agents.runtime.python_java_utils import (
-    from_java_document,
-    from_java_vector_store_query_result,
-)
+from flink_agents.runtime.python_java_utils import from_java_document
 
 
 class JavaVectorStoreImpl(JavaCollectionManageableVectorStore):
@@ -68,28 +63,16 @@ class 
JavaVectorStoreImpl(JavaCollectionManageableVectorStore):
 
     @override
     def open(self) -> None:
+        # Resolve ``embedding_model`` (string → BaseEmbeddingModelSetup 
instance) on
+        # the Python side via ``BaseVectorStore.open`` so that 
``add``/``query``/
+        # ``update`` can embed in Python before crossing to Java. Doing the 
embed
+        # on the Java side would force a Java→Python re-entry through pemja for
+        # PYTHON-backed embedding models, which corrupts the CPython per-thread
+        # state and crashes the next ``interpreter.get(...)`` inside
+        # ``JcpPyDecimal_Check → PyImport_ImportModule``.
+        super().open()
         self._j_resource.open()
 
-    @override
-    def add(
-        self,
-        documents: Document | List[Document],
-        collection_name: str | None = None,
-        **kwargs: Any,
-    ) -> List[str]:
-        documents = _maybe_cast_to_list(documents)
-        j_documents = [
-            _to_j_document(self._j_resource_adapter, doc) for doc in documents
-        ]
-
-        return self._j_resource.add(j_documents, collection_name, kwargs)
-
-    @override
-    def query(self, query: VectorStoreQuery) -> VectorStoreQueryResult:
-        j_query = self._j_resource_adapter.fromPythonVectorStoreQuery(query)
-        j_query_result = self._j_resource.query(j_query)
-        return from_java_vector_store_query_result(j_query_result)
-
     @override
     def get(
         self,
@@ -114,19 +97,6 @@ class 
JavaVectorStoreImpl(JavaCollectionManageableVectorStore):
         ids = _maybe_cast_to_list(ids)
         return self._j_resource.delete(ids, collection_name, filters, kwargs)
 
-    @override
-    def update(
-        self,
-        documents: Document | List[Document],
-        collection_name: str | None = None,
-        **kwargs: Any,
-    ) -> None:
-        documents = _maybe_cast_to_list(documents)
-        j_documents = [
-            _to_j_document(self._j_resource_adapter, doc) for doc in documents
-        ]
-        self._j_resource.update(j_documents, collection_name, kwargs)
-
     @override
     def create_collection_if_not_exists(self, name: str, **kwargs: Any) -> 
None:
         """Forward to the Java side, passing all kwargs through as a map; the 
Java
diff --git a/python/flink_agents/runtime/python_java_utils.py 
b/python/flink_agents/runtime/python_java_utils.py
index 7d494638..58389c82 100644
--- a/python/flink_agents/runtime/python_java_utils.py
+++ b/python/flink_agents/runtime/python_java_utils.py
@@ -34,7 +34,6 @@ from flink_agents.api.vector_stores.vector_store import (
     Document,
     VectorStoreQuery,
     VectorStoreQueryMode,
-    VectorStoreQueryResult,
 )
 from flink_agents.plan.resource_provider import JAVA_RESOURCE_MAPPING
 from flink_agents.runtime.java.java_resource_wrapper import (
@@ -263,15 +262,6 @@ def from_java_vector_store_query(j_query: Any) -> 
VectorStoreQuery:
     )
 
 
-def from_java_vector_store_query_result(j_query: Any) -> 
VectorStoreQueryResult:
-    """Convert a Java vector store query result to a Python query result."""
-    return VectorStoreQueryResult(
-        documents=[
-            from_java_document(j_document) for j_document in 
j_query.getDocuments()
-        ],
-    )
-
-
 def from_java_message_role(j_role: Any) -> MessageRole:
     """Convert a Java message role to a Python message role."""
     return MessageRole(j_role.getValue())
@@ -288,11 +278,6 @@ def get_java_tool_metadata_from_tool(tool: Tool) -> 
typing.Dict[str, str]:
     }
 
 
-def get_mode_value(query: VectorStoreQuery) -> str:
-    """Get the mode value of a VectorStoreQuery."""
-    return query.mode.value
-
-
 def get_long_term_memory(ctx: Any) -> Any:
     """Return ``ctx.long_term_memory`` (or ``None``). Used by the Java side to
     avoid relying on Pemja's ``PyObject.getAttr`` semantics for attributes that
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/JavaResourceAdapter.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/JavaResourceAdapter.java
index fbb0365e..f17d7ce7 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/JavaResourceAdapter.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/JavaResourceAdapter.java
@@ -23,10 +23,7 @@ import org.apache.flink.agents.api.resource.Resource;
 import org.apache.flink.agents.api.resource.ResourceContext;
 import org.apache.flink.agents.api.resource.ResourceType;
 import org.apache.flink.agents.api.vectorstores.Document;
-import org.apache.flink.agents.api.vectorstores.VectorStoreQuery;
-import org.apache.flink.agents.api.vectorstores.VectorStoreQueryMode;
 import pemja.core.PythonInterpreter;
-import pemja.core.object.PyObject;
 
 import java.util.List;
 import java.util.Map;
@@ -108,20 +105,4 @@ public class JavaResourceAdapter {
             Float score) {
         return new Document(content, metadata, id, embedding, score);
     }
-
-    @SuppressWarnings("unchecked")
-    public VectorStoreQuery fromPythonVectorStoreQuery(PyObject 
pythonVectorStoreQuery) {
-        // TODO: Delete this method after the pemja findClass method is fixed.
-        String modeValue =
-                (String)
-                        interpreter.invoke(
-                                "python_java_utils.get_mode_value", 
pythonVectorStoreQuery);
-        return new VectorStoreQuery(
-                VectorStoreQueryMode.fromValue(modeValue),
-                (String) pythonVectorStoreQuery.getAttr("query_text"),
-                pythonVectorStoreQuery.getAttr("limit", Integer.class),
-                (String) pythonVectorStoreQuery.getAttr("collection_name"),
-                (Map<String, Object>) 
pythonVectorStoreQuery.getAttr("filters", Map.class),
-                (Map<String, Object>) 
pythonVectorStoreQuery.getAttr("extra_args", Map.class));
-    }
 }

Reply via email to