This is an automated email from the ASF dual-hosted git repository.
jin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-ai.git
The following commit(s) were added to refs/heads/main by this push:
new 820bfb2 refactor(llm): add a button to backup data & count together
(#153)
820bfb2 is described below
commit 820bfb27ec4d2068ec43ccd1f980d2e3fed9392c
Author: SoJGooo <[email protected]>
AuthorDate: Fri Jan 3 18:56:48 2025 +0800
refactor(llm): add a button to backup data & count together (#153)
1. add a button to backup data
2. refactor backup_data
3. refactor timely_update_vid_embedding
4. return count & graph elements together
---------
Co-authored-by: imbajin <[email protected]>
---
.gitattributes | 1 +
hugegraph-llm/.gitignore | 1 +
.../src/hugegraph_llm/demo/rag_demo/other_block.py | 94 ++--------------------
.../src/hugegraph_llm/demo/rag_demo/rag_block.py | 1 +
.../demo/rag_demo/vector_graph_block.py | 15 ++++
.../src/hugegraph_llm/utils/hugegraph_utils.py | 67 ++++++++++++++-
6 files changed, 92 insertions(+), 87 deletions(-)
diff --git a/.gitattributes b/.gitattributes
index 948dfc7..4a365f0 100644
--- a/.gitattributes
+++ b/.gitattributes
@@ -14,3 +14,4 @@ hugegraph-ml/ export-ignore
.idea/ export-ignore
style/ export-ignore
scripts/ export-ignore
+hugegraph-llm/src/hugegraph_llm/resources/backup-graph-data-4020/ export-ignore
diff --git a/hugegraph-llm/.gitignore b/hugegraph-llm/.gitignore
index 4de6eba..1740bd2 100644
--- a/hugegraph-llm/.gitignore
+++ b/hugegraph-llm/.gitignore
@@ -1,2 +1,3 @@
src/hugegraph_llm/resources/demo/questions_answers.xlsx
src/hugegraph_llm/resources/demo/questions.xlsx
+src/hugegraph_llm/resources/backup-graph-data-4020/
diff --git a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/other_block.py
b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/other_block.py
index 143da2b..da10f50 100644
--- a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/other_block.py
+++ b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/other_block.py
@@ -16,27 +16,16 @@
# under the License.
import asyncio
-import json
-import os
-import shutil
from contextlib import asynccontextmanager
-from datetime import datetime
import gradio as gr
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from fastapi import FastAPI
-from hugegraph_llm.config import huge_settings, resource_path
-from hugegraph_llm.utils.graph_index_utils import update_vid_embedding
-from hugegraph_llm.utils.hugegraph_utils import init_hg_test_data,
run_gremlin_query
+from hugegraph_llm.utils.hugegraph_utils import init_hg_test_data,
run_gremlin_query, backup_data
from hugegraph_llm.utils.log import log
-from pyhugegraph.client import PyHugeClient
-
-MAX_BACKUP_DIRS = 7
-MAX_VERTICES = 100000
-MAX_EDGES = 200000
-BACKUP_DIR = str(os.path.join(resource_path, huge_settings.graph_name,
"backup"))
+from hugegraph_llm.demo.rag_demo.vector_graph_block import
timely_update_vid_embedding
def create_other_block():
@@ -48,6 +37,11 @@ def create_other_block():
btn.click(fn=run_gremlin_query, inputs=[inp], outputs=out) # pylint:
disable=no-member
gr.Markdown("---")
+ with gr.Row():
+ inp = []
+ out = gr.Textbox(label="Backup Graph Manually (Auto backup at 1:00 AM
everyday)", show_copy_button=True)
+ btn = gr.Button("Backup Graph Data")
+ btn.click(fn=backup_data, inputs=inp, outputs=out) # pylint:
disable=no-member
with gr.Accordion("Init HugeGraph test data (🚧)", open=False):
with gr.Row():
inp = []
@@ -56,85 +50,13 @@ def create_other_block():
btn.click(fn=init_hg_test_data, inputs=inp, outputs=out) # pylint:
disable=no-member
-def create_dir_safely(path):
- if not os.path.exists(path):
- os.makedirs(path)
-
-# TODO: move the logic to a separate file
-def backup_data():
- try:
- client = PyHugeClient(
- huge_settings.graph_ip,
- huge_settings.graph_port,
- huge_settings.graph_name,
- huge_settings.graph_user,
- huge_settings.graph_pwd,
- huge_settings.graph_space,
- )
-
- create_dir_safely(BACKUP_DIR)
-
- date_str = datetime.now().strftime("%Y%m%d_%H%M%S")
- backup_subdir = os.path.join(BACKUP_DIR, f"{date_str}")
- create_dir_safely(backup_subdir)
-
- files = {
- "vertices.json": f"g.V().limit({MAX_VERTICES})",
- "edges.json": f"g.E().id().limit({MAX_EDGES})",
- "schema.json": client.schema().getSchema()
- }
-
- for filename, query in files.items():
- with open(os.path.join(backup_subdir, filename), "w",
encoding="utf-8") as f:
- data = client.gremlin().exec(query)["data"] if "schema" not in
filename else query
- json.dump(data, f)
-
- log.info("Backup completed successfully in %s.", backup_subdir)
- manage_backup_retention()
- except Exception as e:
- log.critical("Backup failed: %s", e, exc_info=True)
- raise Exception("Failed to execute backup") from e
-
-
-def manage_backup_retention():
- try:
- backup_dirs = [
- os.path.join(BACKUP_DIR, d)
- for d in os.listdir(BACKUP_DIR)
- if os.path.isdir(os.path.join(BACKUP_DIR, d))
- ]
- backup_dirs.sort(key=os.path.getctime)
-
- while len(backup_dirs) > MAX_BACKUP_DIRS:
- old_backup = backup_dirs.pop(0)
- shutil.rmtree(old_backup)
- log.info("Deleted old backup: %s", old_backup)
- except Exception as e:
- log.error("Failed to manage backup retention: %s", e, exc_info=True)
- raise Exception("Failed to manage backup retention") from e
-
-
-async def timely_update_vid_embedding():
- while True:
- try:
- await asyncio.to_thread(update_vid_embedding)
- log.info("rebuild_vid_index timely executed successfully.")
- except asyncio.CancelledError as ce:
- log.info("Periodic task has been cancelled due to: %s", ce)
- break
- except Exception as e:
- log.error("Failed to execute rebuild_vid_index: %s", e,
exc_info=True)
- raise Exception("Failed to execute rebuild_vid_index") from e
- await asyncio.sleep(3600)
-
-
@asynccontextmanager
async def lifespan(app: FastAPI): # pylint: disable=W0621
log.info("Starting background scheduler...")
scheduler = AsyncIOScheduler()
scheduler.add_job(
backup_data,
- trigger=CronTrigger(hour=14, minute=16),
+ trigger=CronTrigger(hour=1, minute=0),
id="daily_backup",
replace_existing=True
)
diff --git a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/rag_block.py
b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/rag_block.py
index c10f84b..822b080 100644
--- a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/rag_block.py
+++ b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/rag_block.py
@@ -198,6 +198,7 @@ def create_rag_block():
"Graph-only Answer",
"Graph-Vector Answer",
]
+ # FIXME: "demo" might conflict with the graph name, it should be modified.
answers_path = os.path.join(resource_path, "demo",
"questions_answers.xlsx")
questions_path = os.path.join(resource_path, "demo", "questions.xlsx")
questions_template_path = os.path.join(resource_path, "demo",
"questions_template.xlsx")
diff --git
a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/vector_graph_block.py
b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/vector_graph_block.py
index 1e48e21..a78835f 100644
--- a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/vector_graph_block.py
+++ b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/vector_graph_block.py
@@ -17,6 +17,7 @@
# pylint: disable=E1101
+import asyncio
import gradio as gr
from hugegraph_llm.config import prompt
@@ -28,6 +29,7 @@ from hugegraph_llm.utils.graph_index_utils import (
import_graph_data,
)
from hugegraph_llm.utils.vector_index_utils import clean_vector_index,
build_vector_index, get_vector_index_info
+from hugegraph_llm.utils.log import log
def store_prompt(doc, schema, example_prompt):
# update env variables: doc, schema and example_prompt
@@ -139,3 +141,16 @@ def create_vector_graph_block():
tab_upload_text.select(fn=on_tab_select, inputs=[input_file, input_text],
outputs=[input_file, input_text])
return input_text, input_schema, info_extract_template
+
+async def timely_update_vid_embedding():
+ while True:
+ try:
+ await asyncio.to_thread(update_vid_embedding)
+ log.info("rebuild_vid_index timely executed successfully.")
+ except asyncio.CancelledError as ce:
+ log.info("Periodic task has been cancelled due to: %s", ce)
+ break
+ except Exception as e:
+ log.error("Failed to execute rebuild_vid_index: %s", e,
exc_info=True)
+ raise Exception("Failed to execute rebuild_vid_index") from e
+ await asyncio.sleep(3600)
diff --git a/hugegraph-llm/src/hugegraph_llm/utils/hugegraph_utils.py
b/hugegraph-llm/src/hugegraph_llm/utils/hugegraph_utils.py
index 7dc69cc..53fccdd 100644
--- a/hugegraph-llm/src/hugegraph_llm/utils/hugegraph_utils.py
+++ b/hugegraph-llm/src/hugegraph_llm/utils/hugegraph_utils.py
@@ -14,11 +14,21 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
import json
+import os
+import shutil
+from datetime import datetime
-from hugegraph_llm.config import huge_settings
+from hugegraph_llm.config import huge_settings, resource_path
+from hugegraph_llm.utils.log import log
from pyhugegraph.client import PyHugeClient
+MAX_BACKUP_DIRS = 7
+MAX_VERTICES = 100000
+MAX_EDGES = 200000
+BACKUP_DIR = str(os.path.join(resource_path, "backup-graph-data-4020",
huge_settings.graph_name))
+
def run_gremlin_query(query, fmt=True):
res = get_hg_client().gremlin().exec(query)
@@ -81,3 +91,58 @@ def init_hg_test_data():
def clean_hg_data():
client = get_hg_client()
client.graphs().clear_graph_all_data()
+
+
+def create_dir_safely(path):
+ if not os.path.exists(path):
+ os.makedirs(path)
+
+
+def backup_data():
+ try:
+ client = get_hg_client()
+
+ create_dir_safely(BACKUP_DIR)
+
+ date_str = datetime.now().strftime("%Y%m%d")
+ backup_subdir = os.path.join(BACKUP_DIR, f"{date_str}")
+ create_dir_safely(backup_subdir)
+
+
+ files = {
+ "vertices.json": f"g.V().limit({MAX_VERTICES})"
+
f".aggregate('vertices').count().as('count').select('count','vertices')",
+ "edges.json":
f"g.E().limit({MAX_EDGES}).aggregate('edges').count().as('count').select('count','edges')",
+ "schema.json": client.schema().getSchema()
+ }
+
+ for filename, query in files.items():
+ with open(os.path.join(backup_subdir, filename), "w",
encoding="utf-8") as f:
+ data = client.gremlin().exec(query)["data"] if "schema" not in
filename else query
+ json.dump(data, f, ensure_ascii=False)
+
+ log.info("Backup completed successfully in %s.", backup_subdir)
+ del_info = manage_backup_retention()
+ return f"Backup completed successfully in {backup_subdir} \n{del_info}"
+ except Exception as e: # pylint: disable=W0718
+ log.critical("Backup failed: %s", e, exc_info=True)
+ raise Exception("Failed to execute backup") from e
+
+
+def manage_backup_retention():
+ try:
+ backup_dirs = [
+ os.path.join(BACKUP_DIR, d)
+ for d in os.listdir(BACKUP_DIR)
+ if os.path.isdir(os.path.join(BACKUP_DIR, d))
+ ]
+ backup_dirs.sort(key=os.path.getctime)
+ if len(backup_dirs) > MAX_BACKUP_DIRS:
+ old_backup = backup_dirs.pop(0)
+ shutil.rmtree(old_backup)
+ log.info("Deleted old backup: %s", old_backup)
+ return f"Deleted old backup: {old_backup}"
+ return f"The current number of backup files <= {MAX_BACKUP_DIRS}, so
no files are deleted"
+ except Exception as e: # pylint: disable=W0718
+ log.error("Failed to manage backup retention: %s", e, exc_info=True)
+ raise Exception("Failed to manage backup retention") from e