This is an automated email from the ASF dual-hosted git repository.
jin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-ai.git
The following commit(s) were added to refs/heads/main by this push:
new 4c97cd6 feat(llm): automatic backup graph data timely (#151)
4c97cd6 is described below
commit 4c97cd65999e551aaf0ee5493b179a2b1a91713e
Author: SoJGooo <[email protected]>
AuthorDate: Thu Jan 2 19:07:31 2025 +0800
feat(llm): automatic backup graph data timely (#151)
TODO:
Add a button for user to manually run it
---------
Co-authored-by: imbajin <[email protected]>
---
.gitattributes | 1 +
hugegraph-llm/requirements.txt | 1 +
.../src/hugegraph_llm/demo/rag_demo/app.py | 33 +-----
.../src/hugegraph_llm/demo/rag_demo/other_block.py | 118 +++++++++++++++++++++
.../pyhugegraph/api/schema_manage/edge_label.py | 9 ++
5 files changed, 131 insertions(+), 31 deletions(-)
diff --git a/.gitattributes b/.gitattributes
index 333c08e..948dfc7 100644
--- a/.gitattributes
+++ b/.gitattributes
@@ -9,6 +9,7 @@ apache-release.sh export-ignore
*.docx export-ignore
# ignored directory
+hugegraph-ml/ export-ignore
.github/ export-ignore
.idea/ export-ignore
style/ export-ignore
diff --git a/hugegraph-llm/requirements.txt b/hugegraph-llm/requirements.txt
index 9cc8c01..5f369f8 100644
--- a/hugegraph-llm/requirements.txt
+++ b/hugegraph-llm/requirements.txt
@@ -15,3 +15,4 @@ pyarrow~=17.0.0 # TODO: a temporary dependency for pandas,
figure out why Import
pandas~=2.2.2
openpyxl~=3.1.5
pydantic-settings~=2.6.1
+apscheduler~=3.10.4
diff --git a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/app.py
b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/app.py
index e0508dd..bcc1198 100644
--- a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/app.py
+++ b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/app.py
@@ -15,10 +15,7 @@
# specific language governing permissions and limitations
# under the License.
-
import argparse
-import asyncio
-from contextlib import asynccontextmanager
import gradio as gr
import uvicorn
@@ -37,15 +34,16 @@ from hugegraph_llm.demo.rag_demo.configs_block import (
apply_graph_config,
)
from hugegraph_llm.demo.rag_demo.other_block import create_other_block
+from hugegraph_llm.demo.rag_demo.other_block import lifespan
from hugegraph_llm.demo.rag_demo.rag_block import create_rag_block, rag_answer
from hugegraph_llm.demo.rag_demo.text2gremlin_block import
create_text2gremlin_block, graph_rag_recall
from hugegraph_llm.demo.rag_demo.vector_graph_block import
create_vector_graph_block
from hugegraph_llm.resources.demo.css import CSS
-from hugegraph_llm.utils.graph_index_utils import update_vid_embedding
from hugegraph_llm.utils.log import log
sec = HTTPBearer()
+
def authenticate(credentials: HTTPAuthorizationCredentials = Depends(sec)):
correct_token = admin_settings.user_token
if credentials.credentials != correct_token:
@@ -57,33 +55,6 @@ def authenticate(credentials: HTTPAuthorizationCredentials =
Depends(sec)):
headers={"WWW-Authenticate": "Bearer"},
)
- # TODO: move the logic to a separate file
-async def timely_update_vid_embedding():
- while True:
- try:
- await asyncio.to_thread(update_vid_embedding)
- log.info("rebuild_vid_index timely executed successfully.")
- except asyncio.CancelledError as ce:
- log.info("Periodic task has been cancelled due to: %s", ce)
- break
- except Exception as e:
- log.error("Failed to execute rebuild_vid_index: %s", e,
exc_info=True)
- raise Exception("Failed to execute rebuild_vid_index") from e
- await asyncio.sleep(3600)
-
-
-@asynccontextmanager
-async def lifespan(app: FastAPI): # pylint: disable=W0621
- log.info("Starting periodic task...")
- task = asyncio.create_task(timely_update_vid_embedding())
- yield
-
- log.info("Stopping periodic task...")
- task.cancel()
- try:
- await task
- except asyncio.CancelledError:
- log.info("Periodic task has been cancelled.")
# pylint: disable=C0301
def init_rag_ui() -> gr.Interface:
diff --git a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/other_block.py
b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/other_block.py
index a156442..143da2b 100644
--- a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/other_block.py
+++ b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/other_block.py
@@ -15,9 +15,28 @@
# specific language governing permissions and limitations
# under the License.
+import asyncio
+import json
+import os
+import shutil
+from contextlib import asynccontextmanager
+from datetime import datetime
+
import gradio as gr
+from apscheduler.schedulers.asyncio import AsyncIOScheduler
+from apscheduler.triggers.cron import CronTrigger
+from fastapi import FastAPI
+from hugegraph_llm.config import huge_settings, resource_path
+from hugegraph_llm.utils.graph_index_utils import update_vid_embedding
from hugegraph_llm.utils.hugegraph_utils import init_hg_test_data,
run_gremlin_query
+from hugegraph_llm.utils.log import log
+from pyhugegraph.client import PyHugeClient
+
+MAX_BACKUP_DIRS = 7
+MAX_VERTICES = 100000
+MAX_EDGES = 200000
+BACKUP_DIR = str(os.path.join(resource_path, huge_settings.graph_name,
"backup"))
def create_other_block():
@@ -35,3 +54,102 @@ def create_other_block():
out = gr.Textbox(label="Init Graph Demo Result",
show_copy_button=True)
btn = gr.Button("(BETA) Init HugeGraph test data (🚧)")
btn.click(fn=init_hg_test_data, inputs=inp, outputs=out) # pylint:
disable=no-member
+
+
+def create_dir_safely(path):
+ if not os.path.exists(path):
+ os.makedirs(path)
+
+# TODO: move the logic to a separate file
+def backup_data():
+ try:
+ client = PyHugeClient(
+ huge_settings.graph_ip,
+ huge_settings.graph_port,
+ huge_settings.graph_name,
+ huge_settings.graph_user,
+ huge_settings.graph_pwd,
+ huge_settings.graph_space,
+ )
+
+ create_dir_safely(BACKUP_DIR)
+
+ date_str = datetime.now().strftime("%Y%m%d_%H%M%S")
+ backup_subdir = os.path.join(BACKUP_DIR, f"{date_str}")
+ create_dir_safely(backup_subdir)
+
+ files = {
+ "vertices.json": f"g.V().limit({MAX_VERTICES})",
+ "edges.json": f"g.E().id().limit({MAX_EDGES})",
+ "schema.json": client.schema().getSchema()
+ }
+
+ for filename, query in files.items():
+ with open(os.path.join(backup_subdir, filename), "w",
encoding="utf-8") as f:
+ data = client.gremlin().exec(query)["data"] if "schema" not in
filename else query
+ json.dump(data, f)
+
+ log.info("Backup completed successfully in %s.", backup_subdir)
+ manage_backup_retention()
+ except Exception as e:
+ log.critical("Backup failed: %s", e, exc_info=True)
+ raise Exception("Failed to execute backup") from e
+
+
+def manage_backup_retention():
+ try:
+ backup_dirs = [
+ os.path.join(BACKUP_DIR, d)
+ for d in os.listdir(BACKUP_DIR)
+ if os.path.isdir(os.path.join(BACKUP_DIR, d))
+ ]
+ backup_dirs.sort(key=os.path.getctime)
+
+ while len(backup_dirs) > MAX_BACKUP_DIRS:
+ old_backup = backup_dirs.pop(0)
+ shutil.rmtree(old_backup)
+ log.info("Deleted old backup: %s", old_backup)
+ except Exception as e:
+ log.error("Failed to manage backup retention: %s", e, exc_info=True)
+ raise Exception("Failed to manage backup retention") from e
+
+
+async def timely_update_vid_embedding():
+ while True:
+ try:
+ await asyncio.to_thread(update_vid_embedding)
+ log.info("rebuild_vid_index timely executed successfully.")
+ except asyncio.CancelledError as ce:
+ log.info("Periodic task has been cancelled due to: %s", ce)
+ break
+ except Exception as e:
+ log.error("Failed to execute rebuild_vid_index: %s", e,
exc_info=True)
+ raise Exception("Failed to execute rebuild_vid_index") from e
+ await asyncio.sleep(3600)
+
+
+@asynccontextmanager
+async def lifespan(app: FastAPI): # pylint: disable=W0621
+ log.info("Starting background scheduler...")
+ scheduler = AsyncIOScheduler()
+ scheduler.add_job(
+ backup_data,
+ trigger=CronTrigger(hour=14, minute=16),
+ id="daily_backup",
+ replace_existing=True
+ )
+ scheduler.start()
+
+ log.info("Starting vid embedding update task...")
+ embedding_task = asyncio.create_task(timely_update_vid_embedding())
+ yield
+
+ log.info("Stopping vid embedding update task...")
+ embedding_task.cancel()
+ try:
+ await embedding_task
+ except asyncio.CancelledError:
+ log.info("Vid embedding update task cancelled.")
+
+ log.info("Shutting down background scheduler...")
+ scheduler.shutdown()
diff --git
a/hugegraph-python-client/src/pyhugegraph/api/schema_manage/edge_label.py
b/hugegraph-python-client/src/pyhugegraph/api/schema_manage/edge_label.py
index 636a564..93f2180 100644
--- a/hugegraph-python-client/src/pyhugegraph/api/schema_manage/edge_label.py
+++ b/hugegraph-python-client/src/pyhugegraph/api/schema_manage/edge_label.py
@@ -87,6 +87,15 @@ class EdgeLabel(HugeParamsBase):
self._parameter_holder.set("not_exist", False)
return self
+ @decorator_params
+ def enableLabelIndex(self, flag) -> "EdgeLabel":
+ """
+ Set whether to enable label indexing. If enabled, you can use
`edge_labels[label]` to access the edge's label.
+ Default is False.
+ """
+ self._parameter_holder.set("enable_label_index", flag)
+ return self
+
@decorator_create
def create(self):
dic = self._parameter_holder.get_dic()