This is an automated email from the ASF dual-hosted git repository.
jin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-ai.git
The following commit(s) were added to refs/heads/main by this push:
new 279802b refactor(llm): use asyncio to embed text (#215)
279802b is described below
commit 279802b2fbfa8e06e8092735371758b161372b2b
Author: Moncef <[email protected]>
AuthorDate: Tue May 20 11:53:56 2025 +0100
refactor(llm): use asyncio to embed text (#215)
This PR refactors the embedding retrieval by replacing the synchronous
ThreadPoolExecutor with an asynchronous implementation using asyncio.
- Introduces an async function to retrieve embeddings concurrently with a
semaphore to limit concurrency.
- Updates the run method to use asyncio.run for executing the async
embedding retrieval.
---
Co-authored-by: imbajin <[email protected]>
---
.../operators/index_op/build_semantic_index.py | 38 ++++++++++++++--------
1 file changed, 25 insertions(+), 13 deletions(-)
diff --git
a/hugegraph-llm/src/hugegraph_llm/operators/index_op/build_semantic_index.py
b/hugegraph-llm/src/hugegraph_llm/operators/index_op/build_semantic_index.py
index 66ab259..f8a911e 100644
--- a/hugegraph-llm/src/hugegraph_llm/operators/index_op/build_semantic_index.py
+++ b/hugegraph-llm/src/hugegraph_llm/operators/index_op/build_semantic_index.py
@@ -16,6 +16,7 @@
# under the License.
+import asyncio
import os
from typing import Any, Dict
@@ -24,8 +25,9 @@ from tqdm import tqdm
from hugegraph_llm.config import resource_path, huge_settings
from hugegraph_llm.indices.vector_index import VectorIndex
from hugegraph_llm.models.embeddings.base import BaseEmbedding
-from hugegraph_llm.utils.log import log
from hugegraph_llm.operators.hugegraph_op.schema_manager import SchemaManager
+from hugegraph_llm.utils.log import log
+
class BuildSemanticIndex:
def __init__(self, embedding: BaseEmbedding):
@@ -37,20 +39,33 @@ class BuildSemanticIndex:
def _extract_names(self, vertices: list[str]) -> list[str]:
return [v.split(":")[1] for v in vertices]
- # TODO: use asyncio for IO tasks
- def _get_embeddings_parallel(self, vids: list[str]) -> list[Any]:
- from concurrent.futures import ThreadPoolExecutor
- with ThreadPoolExecutor() as executor:
- embeddings =
list(tqdm(executor.map(self.embedding.get_text_embedding, vids),
total=len(vids)))
+ async def _get_embeddings_parallel(self, vids: list[str]) -> list[Any]:
+ sem = asyncio.Semaphore(10)
+
+ async def get_embedding_with_semaphore(vid: str) -> Any:
+ # Executes sync embedding method in a thread pool via
loop.run_in_executor, combining async programming
+ # with multi-threading capabilities.
+ # This pattern avoids blocking the event loop and prepares for a
future fully async pipeline.
+ async with sem:
+ loop = asyncio.get_running_loop()
+ return await loop.run_in_executor(None,
self.embedding.get_text_embedding, vid)
+
+ tasks = [get_embedding_with_semaphore(vid) for vid in vids]
+ embeddings = []
+ with tqdm(total=len(tasks)) as pbar:
+ for future in asyncio.as_completed(tasks):
+ embedding = await future
+ embeddings.append(embedding)
+ pbar.update(1)
return embeddings
def run(self, context: Dict[str, Any]) -> Dict[str, Any]:
vertexlabels = self.sm.schema.getSchema()["vertexlabels"]
- all_pk_flag = all(data.get('id_strategy') == 'PRIMARY_KEY' for data in
vertexlabels)
+ all_pk_flag = all(data.get("id_strategy") == "PRIMARY_KEY" for data in
vertexlabels)
past_vids = self.vid_index.properties
# TODO: We should build vid vector index separately, especially when
the vertices may be very large
- present_vids = context["vertices"] # Warning: data truncated by
fetch_graph_data.py
+ present_vids = context["vertices"] # Warning: data truncated by
fetch_graph_data.py
removed_vids = set(past_vids) - set(present_vids)
removed_num = self.vid_index.remove(removed_vids)
if removed_vids:
@@ -59,14 +74,11 @@ class BuildSemanticIndex:
if added_vids:
vids_to_process = self._extract_names(added_vids) if all_pk_flag
else added_vids
- added_embeddings = self._get_embeddings_parallel(vids_to_process)
+ added_embeddings =
asyncio.run(self._get_embeddings_parallel(vids_to_process))
log.info("Building vector index for %s vertices...",
len(added_vids))
self.vid_index.add(added_embeddings, added_vids)
self.vid_index.to_index_file(self.index_dir)
else:
log.debug("No update vertices to build vector index.")
- context.update({
- "removed_vid_vector_num": removed_num,
- "added_vid_vector_num": len(added_vids)
- })
+ context.update({"removed_vid_vector_num": removed_num,
"added_vid_vector_num": len(added_vids)})
return context