This is an automated email from the ASF dual-hosted git repository.

jin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-ai.git


The following commit(s) were added to refs/heads/main by this push:
     new 279802b  refactor(llm): use asyncio to embed text (#215)
279802b is described below

commit 279802b2fbfa8e06e8092735371758b161372b2b
Author: Moncef <[email protected]>
AuthorDate: Tue May 20 11:53:56 2025 +0100

    refactor(llm): use asyncio to embed text (#215)
    
    This PR refactors the embedding retrieval by replacing the synchronous 
ThreadPoolExecutor with an asynchronous implementation using asyncio.
    - Introduces an async function to retrieve embeddings concurrently with a 
semaphore to limit concurrency.
    - Updates the run method to use asyncio.run for executing the async 
embedding retrieval.
    
    ---
    Co-authored-by: imbajin <[email protected]>
---
 .../operators/index_op/build_semantic_index.py     | 38 ++++++++++++++--------
 1 file changed, 25 insertions(+), 13 deletions(-)

diff --git 
a/hugegraph-llm/src/hugegraph_llm/operators/index_op/build_semantic_index.py 
b/hugegraph-llm/src/hugegraph_llm/operators/index_op/build_semantic_index.py
index 66ab259..f8a911e 100644
--- a/hugegraph-llm/src/hugegraph_llm/operators/index_op/build_semantic_index.py
+++ b/hugegraph-llm/src/hugegraph_llm/operators/index_op/build_semantic_index.py
@@ -16,6 +16,7 @@
 # under the License.
 
 
+import asyncio
 import os
 from typing import Any, Dict
 
@@ -24,8 +25,9 @@ from tqdm import tqdm
 from hugegraph_llm.config import resource_path, huge_settings
 from hugegraph_llm.indices.vector_index import VectorIndex
 from hugegraph_llm.models.embeddings.base import BaseEmbedding
-from hugegraph_llm.utils.log import log
 from hugegraph_llm.operators.hugegraph_op.schema_manager import SchemaManager
+from hugegraph_llm.utils.log import log
+
 
 class BuildSemanticIndex:
     def __init__(self, embedding: BaseEmbedding):
@@ -37,20 +39,33 @@ class BuildSemanticIndex:
     def _extract_names(self, vertices: list[str]) -> list[str]:
         return [v.split(":")[1] for v in vertices]
 
-    # TODO: use asyncio for IO tasks
-    def _get_embeddings_parallel(self, vids: list[str]) -> list[Any]:
-        from concurrent.futures import ThreadPoolExecutor
-        with ThreadPoolExecutor() as executor:
-            embeddings = 
list(tqdm(executor.map(self.embedding.get_text_embedding, vids), 
total=len(vids)))
+    async def _get_embeddings_parallel(self, vids: list[str]) -> list[Any]:
+        sem = asyncio.Semaphore(10)
+
+        async def get_embedding_with_semaphore(vid: str) -> Any:
+            # Executes sync embedding method in a thread pool via 
loop.run_in_executor, combining async programming
+            # with multi-threading capabilities.
+            # This pattern avoids blocking the event loop and prepares for a 
future fully async pipeline.
+            async with sem:
+                loop = asyncio.get_running_loop()
+                return await loop.run_in_executor(None, 
self.embedding.get_text_embedding, vid)
+
+        tasks = [get_embedding_with_semaphore(vid) for vid in vids]
+        embeddings = []
+        with tqdm(total=len(tasks)) as pbar:
+            for future in asyncio.as_completed(tasks):
+                embedding = await future
+                embeddings.append(embedding)
+                pbar.update(1)
         return embeddings
 
     def run(self, context: Dict[str, Any]) -> Dict[str, Any]:
         vertexlabels = self.sm.schema.getSchema()["vertexlabels"]
-        all_pk_flag = all(data.get('id_strategy') == 'PRIMARY_KEY' for data in 
vertexlabels)
+        all_pk_flag = all(data.get("id_strategy") == "PRIMARY_KEY" for data in 
vertexlabels)
 
         past_vids = self.vid_index.properties
         # TODO: We should build vid vector index separately, especially when 
the vertices may be very large
-        present_vids = context["vertices"] # Warning: data truncated by 
fetch_graph_data.py
+        present_vids = context["vertices"]  # Warning: data truncated by 
fetch_graph_data.py
         removed_vids = set(past_vids) - set(present_vids)
         removed_num = self.vid_index.remove(removed_vids)
         if removed_vids:
@@ -59,14 +74,11 @@ class BuildSemanticIndex:
 
         if added_vids:
             vids_to_process = self._extract_names(added_vids) if all_pk_flag 
else added_vids
-            added_embeddings = self._get_embeddings_parallel(vids_to_process)
+            added_embeddings = 
asyncio.run(self._get_embeddings_parallel(vids_to_process))
             log.info("Building vector index for %s vertices...", 
len(added_vids))
             self.vid_index.add(added_embeddings, added_vids)
             self.vid_index.to_index_file(self.index_dir)
         else:
             log.debug("No update vertices to build vector index.")
-        context.update({
-            "removed_vid_vector_num": removed_num,
-            "added_vid_vector_num": len(added_vids)
-        })
+        context.update({"removed_vid_vector_num": removed_num, 
"added_vid_vector_num": len(added_vids)})
         return context

Reply via email to