This is an automated email from the ASF dual-hosted git repository.
jin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-ai.git
The following commit(s) were added to refs/heads/main by this push:
new f1713b0 feat(llm): timely execute vid embedding & enhance some HTTP
logic (#141)
f1713b0 is described below
commit f1713b0b245723a1a76f03a0b7a232812b6aff71
Author: SoJGooo <[email protected]>
AuthorDate: Tue Dec 24 15:17:36 2024 +0800
feat(llm): timely execute vid embedding & enhance some HTTP logic (#141)
1. change the location of the button **Rebuild vid Index**
2. After clicking the button **Load into GraphDB(2)**,
execute`import_graph_data` and `fit_vid_index` in sequence
3. excute `fit_vid_index` once an hour
4. fix a string of http request wrong usage with `timeout` & retry params
---------
Co-authored-by: imbajin <[email protected]>
---
.github/workflows/pylint.yml | 1 -
.../src/hugegraph_llm/demo/rag_demo/app.py | 35 ++++++++++++++++--
.../hugegraph_llm/demo/rag_demo/configs_block.py | 4 +--
.../demo/rag_demo/vector_graph_block.py | 8 ++---
.../src/hugegraph_llm/models/rerankers/cohere.py | 2 +-
.../hugegraph_llm/models/rerankers/siliconflow.py | 2 +-
.../src/hugegraph_llm/utils/graph_index_utils.py | 42 +++++++++++++---------
hugegraph-python-client/src/pyhugegraph/client.py | 4 +--
.../src/pyhugegraph/utils/huge_config.py | 4 +--
.../src/pyhugegraph/utils/huge_requests.py | 4 +--
10 files changed, 71 insertions(+), 35 deletions(-)
diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml
index 34f3e9b..3969005 100644
--- a/.github/workflows/pylint.yml
+++ b/.github/workflows/pylint.yml
@@ -25,7 +25,6 @@ jobs:
- name: Install dependencies
run: |
- python -m pip install --upgrade pip
pip install pylint pytest
pip install -r ./hugegraph-llm/requirements.txt
pip install -r ./hugegraph-ml/requirements.txt
diff --git a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/app.py
b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/app.py
index 700e60b..1d94bd9 100644
--- a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/app.py
+++ b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/app.py
@@ -17,6 +17,8 @@
import argparse
+import asyncio
+from contextlib import asynccontextmanager
import gradio as gr
import uvicorn
@@ -35,15 +37,15 @@ from hugegraph_llm.demo.rag_demo.configs_block import (
apply_graph_config,
)
from hugegraph_llm.demo.rag_demo.other_block import create_other_block
-from hugegraph_llm.demo.rag_demo.text2gremlin_block import
create_text2gremlin_block, graph_rag_recall
from hugegraph_llm.demo.rag_demo.rag_block import create_rag_block, rag_answer
+from hugegraph_llm.demo.rag_demo.text2gremlin_block import
create_text2gremlin_block, graph_rag_recall
from hugegraph_llm.demo.rag_demo.vector_graph_block import
create_vector_graph_block
from hugegraph_llm.resources.demo.css import CSS
+from hugegraph_llm.utils.graph_index_utils import update_vid_embedding
from hugegraph_llm.utils.log import log
sec = HTTPBearer()
-
def authenticate(credentials: HTTPAuthorizationCredentials = Depends(sec)):
correct_token = admin_settings.user_token
if credentials.credentials != correct_token:
@@ -56,6 +58,33 @@ def authenticate(credentials: HTTPAuthorizationCredentials =
Depends(sec)):
)
+async def timely_update_vid_embedding():
+ while True:
+ try:
+ await asyncio.to_thread(update_vid_embedding)
+ log.info("rebuild_vid_index timely executed successfully.")
+ except asyncio.CancelledError as ce:
+ log.info("Periodic task has been cancelled due to: %s", ce)
+ break
+ except Exception as e:
+ log.error("Failed to execute rebuild_vid_index: %s", e,
exc_info=True)
+ raise Exception("Failed to execute rebuild_vid_index") from e
+ await asyncio.sleep(3600)
+
+
+@asynccontextmanager
+async def lifespan(app: FastAPI): # pylint: disable=W0621
+ log.info("Starting periodic task...")
+ task = asyncio.create_task(timely_update_vid_embedding())
+ yield
+
+ log.info("Stopping periodic task...")
+ task.cancel()
+ try:
+ await task
+ except asyncio.CancelledError:
+ log.info("Periodic task has been cancelled.")
+
# pylint: disable=C0301
def init_rag_ui() -> gr.Interface:
with gr.Blocks(
@@ -158,7 +187,7 @@ if __name__ == "__main__":
parser.add_argument("--host", type=str, default="0.0.0.0", help="host")
parser.add_argument("--port", type=int, default=8001, help="port")
args = parser.parse_args()
- app = FastAPI()
+ app = FastAPI(lifespan=lifespan)
# we don't need to manually check the env now
# settings.check_env()
diff --git a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/configs_block.py
b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/configs_block.py
index b45e589..bc66ba5 100644
--- a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/configs_block.py
+++ b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/configs_block.py
@@ -34,9 +34,9 @@ def test_api_connection(url, method="GET", headers=None,
params=None, body=None,
log.debug("Request URL: %s", url)
try:
if method.upper() == "GET":
- resp = requests.get(url, headers=headers, params=params,
timeout=5, auth=auth)
+ resp = requests.get(url, headers=headers, params=params,
timeout=(1.0, 5.0), auth=auth)
elif method.upper() == "POST":
- resp = requests.post(url, headers=headers, params=params,
json=body, timeout=5, auth=auth)
+ resp = requests.post(url, headers=headers, params=params,
json=body, timeout=(1.0, 5.0), auth=auth)
else:
raise ValueError("Unsupported HTTP method, please use GET/POST
instead")
except requests.exceptions.RequestException as e:
diff --git
a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/vector_graph_block.py
b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/vector_graph_block.py
index d28e60a..4bb49e3 100644
--- a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/vector_graph_block.py
+++ b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/vector_graph_block.py
@@ -25,7 +25,7 @@ from hugegraph_llm.config import resource_path, prompt
from hugegraph_llm.utils.graph_index_utils import (
get_graph_index_info,
clean_all_graph_index,
- fit_vid_index,
+ update_vid_embedding,
extract_graph,
import_graph_data,
)
@@ -85,9 +85,9 @@ def create_vector_graph_block():
graph_index_btn1 = gr.Button("Clear Graph Data & Index",
size="sm")
vector_import_bt = gr.Button("Import into Vector", variant="primary")
- graph_index_rebuild_bt = gr.Button("Rebuild vid Index")
graph_extract_bt = gr.Button("Extract Graph Data (1)",
variant="primary")
graph_loading_bt = gr.Button("Load into GraphDB (2)", interactive=True)
+ graph_index_rebuild_bt = gr.Button("Update Vid Embedding")
vector_index_btn0.click(get_vector_index_info, outputs=out).then(
store_prompt,
@@ -109,7 +109,7 @@ def create_vector_graph_block():
store_prompt,
inputs=[input_schema, info_extract_template],
)
- graph_index_rebuild_bt.click(fit_vid_index, outputs=out).then(
+ graph_index_rebuild_bt.click(update_vid_embedding, outputs=out).then(
store_prompt,
inputs=[input_schema, info_extract_template],
)
@@ -119,7 +119,7 @@ def create_vector_graph_block():
extract_graph, inputs=[input_file, input_text, input_schema,
info_extract_template], outputs=[out]
).then(store_prompt, inputs=[input_schema, info_extract_template], )
- graph_loading_bt.click(import_graph_data, inputs=[out, input_schema],
outputs=[out]).then(
+ graph_loading_bt.click(import_graph_data, inputs=[out, input_schema],
outputs=[out]).then(update_vid_embedding).then(
store_prompt,
inputs=[input_schema, info_extract_template],
)
diff --git a/hugegraph-llm/src/hugegraph_llm/models/rerankers/cohere.py
b/hugegraph-llm/src/hugegraph_llm/models/rerankers/cohere.py
index 99aa60d..1710acf 100644
--- a/hugegraph-llm/src/hugegraph_llm/models/rerankers/cohere.py
+++ b/hugegraph-llm/src/hugegraph_llm/models/rerankers/cohere.py
@@ -52,7 +52,7 @@ class CohereReranker:
"top_n": top_n,
"documents": documents,
}
- response = requests.post(url, headers=headers, json=payload,
timeout=60)
+ response = requests.post(url, headers=headers, json=payload,
timeout=(1.0, 10.0))
response.raise_for_status() # Raise an error for bad status codes
results = response.json()["results"]
sorted_docs = [documents[item["index"]] for item in results]
diff --git a/hugegraph-llm/src/hugegraph_llm/models/rerankers/siliconflow.py
b/hugegraph-llm/src/hugegraph_llm/models/rerankers/siliconflow.py
index b4fa14c..d63b0ba 100644
--- a/hugegraph-llm/src/hugegraph_llm/models/rerankers/siliconflow.py
+++ b/hugegraph-llm/src/hugegraph_llm/models/rerankers/siliconflow.py
@@ -53,7 +53,7 @@ class SiliconReranker:
"content-type": Constants.HEADER_CONTENT_TYPE,
"authorization": f"Bearer {self.api_key}",
}
- response = requests.post(url, json=payload, headers=headers,
timeout=60)
+ response = requests.post(url, json=payload, headers=headers,
timeout=(1.0, 10.0))
response.raise_for_status() # Raise an error for bad status codes
results = response.json()["results"]
sorted_docs = [documents[item["index"]] for item in results]
diff --git a/hugegraph-llm/src/hugegraph_llm/utils/graph_index_utils.py
b/hugegraph-llm/src/hugegraph_llm/utils/graph_index_utils.py
index 3aa8699..f44fff8 100644
--- a/hugegraph-llm/src/hugegraph_llm/utils/graph_index_utils.py
+++ b/hugegraph-llm/src/hugegraph_llm/utils/graph_index_utils.py
@@ -19,7 +19,7 @@
import json
import os
import traceback
-from typing import Dict, Any, Union
+from typing import Dict, Any, Union, Optional
import gradio as gr
@@ -51,20 +51,31 @@ def clean_all_graph_index():
gr.Info("Clean graph index successfully!")
-def extract_graph(input_file, input_text, schema, example_prompt) -> str:
-
- texts = read_documents(input_file, input_text)
- builder = KgBuilder(LLMs().get_chat_llm(), Embeddings().get_embedding(),
get_hg_client())
-
- if schema:
+def parse_schema(schema: str, builder: KgBuilder) -> Optional[str]:
+ schema = schema.strip()
+ if schema.startswith('{'):
try:
- schema = json.loads(schema.strip())
+ schema = json.loads(schema)
builder.import_schema(from_user_defined=schema)
except json.JSONDecodeError:
- log.info("Get schema from graph!")
- builder.import_schema(from_hugegraph=schema)
+ log.error("Invalid JSON format in schema. Please check it again.")
+ return "ERROR: Invalid JSON format in schema. Please check it
carefully."
else:
+ log.info("Get schema '%s' from graphdb.", schema)
+ builder.import_schema(from_hugegraph=schema)
+ return None
+
+
+def extract_graph(input_file, input_text, schema, example_prompt) -> str:
+
+ texts = read_documents(input_file, input_text)
+ builder = KgBuilder(LLMs().get_chat_llm(), Embeddings().get_embedding(),
get_hg_client())
+ if not schema:
return "ERROR: please input with correct schema/format."
+
+ error_message = parse_schema(schema, builder)
+ if error_message:
+ return error_message
builder.chunk_split(texts, "document", "zh").extract_info(example_prompt,
"triples")
try:
@@ -76,7 +87,7 @@ def extract_graph(input_file, input_text, schema,
example_prompt) -> str:
raise gr.Error(str(e))
-def fit_vid_index():
+def update_vid_embedding():
builder = KgBuilder(LLMs().get_chat_llm(), Embeddings().get_embedding(),
get_hg_client())
builder.fetch_graph_data().build_vertex_id_semantic_index()
log.debug("Operators: %s", builder.operators)
@@ -96,12 +107,9 @@ def import_graph_data(data: str, schema: str) -> Union[str,
Dict[str, Any]]:
log.debug("Import graph data: %s", data)
builder = KgBuilder(LLMs().get_chat_llm(),
Embeddings().get_embedding(), get_hg_client())
if schema:
- try:
- schema = json.loads(schema.strip())
- builder.import_schema(from_user_defined=schema)
- except json.JSONDecodeError:
- log.info("Get schema from graph!")
- builder.import_schema(from_hugegraph=schema)
+ error_message = parse_schema(schema, builder)
+ if error_message:
+ return error_message
context = builder.commit_to_hugegraph().run(data_json)
gr.Info("Import graph data successfully!")
diff --git a/hugegraph-python-client/src/pyhugegraph/client.py
b/hugegraph-python-client/src/pyhugegraph/client.py
index 17c2938..1f1104b 100644
--- a/hugegraph-python-client/src/pyhugegraph/client.py
+++ b/hugegraph-python-client/src/pyhugegraph/client.py
@@ -54,9 +54,9 @@ class PyHugeClient:
user: str,
pwd: str,
graphspace: Optional[str] = None,
- timeout: int = 10,
+ timeout: Optional[tuple[float, float]] = None
):
- self.cfg = HGraphConfig(ip, port, user, pwd, graph, graphspace,
timeout)
+ self.cfg = HGraphConfig(ip, port, user, pwd, graph, graphspace,
timeout or (0.5, 15.0))
@manager_builder
def schema(self) -> "SchemaManager":
diff --git a/hugegraph-python-client/src/pyhugegraph/utils/huge_config.py
b/hugegraph-python-client/src/pyhugegraph/utils/huge_config.py
index f7cb2d9..4794b7e 100644
--- a/hugegraph-python-client/src/pyhugegraph/utils/huge_config.py
+++ b/hugegraph-python-client/src/pyhugegraph/utils/huge_config.py
@@ -32,7 +32,7 @@ class HGraphConfig:
password: str
graph_name: str
graphspace: Optional[str] = None
- timeout: int = 10
+ timeout: tuple[float, float] = (0.5, 15.0)
gs_supported: bool = field(default=False, init=False)
version: List[int] = field(default_factory=list)
@@ -44,7 +44,7 @@ class HGraphConfig:
else:
try:
response = requests.get(
- f"http://{self.ip}:{self.port}/versions", timeout=1
+ f"http://{self.ip}:{self.port}/versions", timeout=0.5
)
core = response.json()["versions"]["core"]
log.info( # pylint: disable=logging-fstring-interpolation
diff --git a/hugegraph-python-client/src/pyhugegraph/utils/huge_requests.py
b/hugegraph-python-client/src/pyhugegraph/utils/huge_requests.py
index 6951064..6f88a7b 100644
--- a/hugegraph-python-client/src/pyhugegraph/utils/huge_requests.py
+++ b/hugegraph-python-client/src/pyhugegraph/utils/huge_requests.py
@@ -33,8 +33,8 @@ class HGraphSession:
def __init__(
self,
cfg: HGraphConfig,
- retries: int = 5,
- backoff_factor: int = 1,
+ retries: int = 3,
+ backoff_factor: int = 0.1,
status_forcelist=(500, 502, 504),
session: Optional[requests.Session] = None,
):