This is an automated email from the ASF dual-hosted git repository. wenjin272 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-agents.git
commit b1d65de3b23ef06ff0f7f2fde7a2aad9951f04fe Author: WenjinXie <[email protected]> AuthorDate: Sat May 16 20:34:01 2026 +0800 [hotfix] Embed on Python side for cross-language vector store When a JavaCollectionManageableVectorStore wraps a Java vector store backed by a Python embedding model, the previous flow delegated query/ add/update to the Java side, which then called the embedding model from Java. For a PYTHON-backed embedding model that callback is a Java→Python re-entry on the Flink operator thread, and pemja's sub-interpreter state on that thread ends up corrupted; the next pemja conversion in the same thread crashes inside JcpPyDecimal_Check → PyImport_ImportModule with a NULL sys.modules dereference. Drop the add/query/update overrides in JavaVectorStoreImpl and inherit BaseVectorStore's canonical flow: embed in Python first, then make a single-direction Python→Java call to addEmbedding/queryEmbedding/ updateEmbedding. open() now also chains super().open() so the embedding model resolves from string to instance before the Python embed step runs. fromPythonVectorStoreQuery (its only caller is gone) and the matching python_java_utils helpers (get_mode_value, from_java_vector_store_query_result) are removed. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]> --- .../flink_agents/runtime/java/java_vector_store.py | 48 ++++------------------ python/flink_agents/runtime/python_java_utils.py | 15 ------- .../runtime/python/utils/JavaResourceAdapter.java | 19 --------- 3 files changed, 9 insertions(+), 73 deletions(-) diff --git a/python/flink_agents/runtime/java/java_vector_store.py b/python/flink_agents/runtime/java/java_vector_store.py index 923482df..76e54754 100644 --- a/python/flink_agents/runtime/java/java_vector_store.py +++ b/python/flink_agents/runtime/java/java_vector_store.py @@ -25,14 +25,9 @@ from flink_agents.api.vector_stores.java_vector_store import ( ) from flink_agents.api.vector_stores.vector_store import ( Document, - VectorStoreQuery, - VectorStoreQueryResult, _maybe_cast_to_list, ) -from flink_agents.runtime.python_java_utils import ( - from_java_document, - from_java_vector_store_query_result, -) +from flink_agents.runtime.python_java_utils import from_java_document class JavaVectorStoreImpl(JavaCollectionManageableVectorStore): @@ -68,28 +63,16 @@ class JavaVectorStoreImpl(JavaCollectionManageableVectorStore): @override def open(self) -> None: + # Resolve ``embedding_model`` (string → BaseEmbeddingModelSetup instance) on + # the Python side via ``BaseVectorStore.open`` so that ``add``/``query``/ + # ``update`` can embed in Python before crossing to Java. Doing the embed + # on the Java side would force a Java→Python re-entry through pemja for + # PYTHON-backed embedding models, which corrupts the CPython per-thread + # state and crashes the next ``interpreter.get(...)`` inside + # ``JcpPyDecimal_Check → PyImport_ImportModule``. + super().open() self._j_resource.open() - @override - def add( - self, - documents: Document | List[Document], - collection_name: str | None = None, - **kwargs: Any, - ) -> List[str]: - documents = _maybe_cast_to_list(documents) - j_documents = [ - _to_j_document(self._j_resource_adapter, doc) for doc in documents - ] - - return self._j_resource.add(j_documents, collection_name, kwargs) - - @override - def query(self, query: VectorStoreQuery) -> VectorStoreQueryResult: - j_query = self._j_resource_adapter.fromPythonVectorStoreQuery(query) - j_query_result = self._j_resource.query(j_query) - return from_java_vector_store_query_result(j_query_result) - @override def get( self, @@ -114,19 +97,6 @@ class JavaVectorStoreImpl(JavaCollectionManageableVectorStore): ids = _maybe_cast_to_list(ids) return self._j_resource.delete(ids, collection_name, filters, kwargs) - @override - def update( - self, - documents: Document | List[Document], - collection_name: str | None = None, - **kwargs: Any, - ) -> None: - documents = _maybe_cast_to_list(documents) - j_documents = [ - _to_j_document(self._j_resource_adapter, doc) for doc in documents - ] - self._j_resource.update(j_documents, collection_name, kwargs) - @override def create_collection_if_not_exists(self, name: str, **kwargs: Any) -> None: """Forward to the Java side, passing all kwargs through as a map; the Java diff --git a/python/flink_agents/runtime/python_java_utils.py b/python/flink_agents/runtime/python_java_utils.py index 7d494638..58389c82 100644 --- a/python/flink_agents/runtime/python_java_utils.py +++ b/python/flink_agents/runtime/python_java_utils.py @@ -34,7 +34,6 @@ from flink_agents.api.vector_stores.vector_store import ( Document, VectorStoreQuery, VectorStoreQueryMode, - VectorStoreQueryResult, ) from flink_agents.plan.resource_provider import JAVA_RESOURCE_MAPPING from flink_agents.runtime.java.java_resource_wrapper import ( @@ -263,15 +262,6 @@ def from_java_vector_store_query(j_query: Any) -> VectorStoreQuery: ) -def from_java_vector_store_query_result(j_query: Any) -> VectorStoreQueryResult: - """Convert a Java vector store query result to a Python query result.""" - return VectorStoreQueryResult( - documents=[ - from_java_document(j_document) for j_document in j_query.getDocuments() - ], - ) - - def from_java_message_role(j_role: Any) -> MessageRole: """Convert a Java message role to a Python message role.""" return MessageRole(j_role.getValue()) @@ -288,11 +278,6 @@ def get_java_tool_metadata_from_tool(tool: Tool) -> typing.Dict[str, str]: } -def get_mode_value(query: VectorStoreQuery) -> str: - """Get the mode value of a VectorStoreQuery.""" - return query.mode.value - - def get_long_term_memory(ctx: Any) -> Any: """Return ``ctx.long_term_memory`` (or ``None``). Used by the Java side to avoid relying on Pemja's ``PyObject.getAttr`` semantics for attributes that diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/JavaResourceAdapter.java b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/JavaResourceAdapter.java index fbb0365e..f17d7ce7 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/JavaResourceAdapter.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/JavaResourceAdapter.java @@ -23,10 +23,7 @@ import org.apache.flink.agents.api.resource.Resource; import org.apache.flink.agents.api.resource.ResourceContext; import org.apache.flink.agents.api.resource.ResourceType; import org.apache.flink.agents.api.vectorstores.Document; -import org.apache.flink.agents.api.vectorstores.VectorStoreQuery; -import org.apache.flink.agents.api.vectorstores.VectorStoreQueryMode; import pemja.core.PythonInterpreter; -import pemja.core.object.PyObject; import java.util.List; import java.util.Map; @@ -108,20 +105,4 @@ public class JavaResourceAdapter { Float score) { return new Document(content, metadata, id, embedding, score); } - - @SuppressWarnings("unchecked") - public VectorStoreQuery fromPythonVectorStoreQuery(PyObject pythonVectorStoreQuery) { - // TODO: Delete this method after the pemja findClass method is fixed. - String modeValue = - (String) - interpreter.invoke( - "python_java_utils.get_mode_value", pythonVectorStoreQuery); - return new VectorStoreQuery( - VectorStoreQueryMode.fromValue(modeValue), - (String) pythonVectorStoreQuery.getAttr("query_text"), - pythonVectorStoreQuery.getAttr("limit", Integer.class), - (String) pythonVectorStoreQuery.getAttr("collection_name"), - (Map<String, Object>) pythonVectorStoreQuery.getAttr("filters", Map.class), - (Map<String, Object>) pythonVectorStoreQuery.getAttr("extra_args", Map.class)); - } }
