This is an automated email from the ASF dual-hosted git repository.
jin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-ai.git
The following commit(s) were added to refs/heads/main by this push:
new f6b547c refactor(llm): change QPS -> RPM for timer decorator (#241)
f6b547c is described below
commit f6b547c81fbf7bcdd5bf8349d4842cfebd0b3eff
Author: Ethereal-O <[email protected]>
AuthorDate: Tue May 20 19:47:41 2025 +0800
refactor(llm): change QPS -> RPM for timer decorator (#241)
What did I do:
1. Change the function record_qps to record_rpm, which outputs RPM
information instead of qps.
2. Replace the position where record_qps is used with record_rpm to
ensure runtime accuracy.
3. Testing: Tested using the example of hugegraph_1lm.demo.rag_demo.app
to correctly calculate RPM.
4. Testing: Simulate unexpected situations, such as incorrect
configuration items, to ensure that this function does not report
errors.
close #229
---------
Co-authored-by: imbajin <[email protected]>
---
.../src/hugegraph_llm/operators/graph_rag_task.py | 4 +--
.../operators/gremlin_generate_task.py | 4 +--
.../operators/kg_construction_task.py | 4 +--
.../src/hugegraph_llm/utils/decorators.py | 36 +++++++++++++++-------
4 files changed, 31 insertions(+), 17 deletions(-)
diff --git a/hugegraph-llm/src/hugegraph_llm/operators/graph_rag_task.py
b/hugegraph-llm/src/hugegraph_llm/operators/graph_rag_task.py
index 9f3d64d..65c95db 100644
--- a/hugegraph-llm/src/hugegraph_llm/operators/graph_rag_task.py
+++ b/hugegraph-llm/src/hugegraph_llm/operators/graph_rag_task.py
@@ -31,7 +31,7 @@ from hugegraph_llm.operators.index_op.semantic_id_query
import SemanticIdQuery
from hugegraph_llm.operators.index_op.vector_index_query import
VectorIndexQuery
from hugegraph_llm.operators.llm_op.answer_synthesize import AnswerSynthesize
from hugegraph_llm.operators.llm_op.keyword_extract import KeywordExtract
-from hugegraph_llm.utils.decorators import log_time, log_operator_time,
record_qps
+from hugegraph_llm.utils.decorators import log_time, log_operator_time,
record_rpm
from hugegraph_llm.config import prompt, huge_settings
@@ -235,7 +235,7 @@ class RAGPipeline:
return self
@log_time("total time")
- @record_qps
+ @record_rpm
def run(self, **kwargs) -> Dict[str, Any]:
"""
Execute all operators in the pipeline in sequence.
diff --git a/hugegraph-llm/src/hugegraph_llm/operators/gremlin_generate_task.py
b/hugegraph-llm/src/hugegraph_llm/operators/gremlin_generate_task.py
index 95ce59f..70f3d27 100644
--- a/hugegraph-llm/src/hugegraph_llm/operators/gremlin_generate_task.py
+++ b/hugegraph-llm/src/hugegraph_llm/operators/gremlin_generate_task.py
@@ -24,7 +24,7 @@ from hugegraph_llm.operators.hugegraph_op.schema_manager
import SchemaManager
from hugegraph_llm.operators.index_op.build_gremlin_example_index import
BuildGremlinExampleIndex
from hugegraph_llm.operators.index_op.gremlin_example_index_query import
GremlinExampleIndexQuery
from hugegraph_llm.operators.llm_op.gremlin_generate import
GremlinGenerateSynthesize
-from hugegraph_llm.utils.decorators import log_time, log_operator_time,
record_qps
+from hugegraph_llm.utils.decorators import log_time, log_operator_time,
record_rpm
class GremlinGenerator:
@@ -69,7 +69,7 @@ class GremlinGenerator:
return self
@log_time("total time")
- @record_qps
+ @record_rpm
def run(self, **kwargs):
context = kwargs
for operator in self.operators:
diff --git a/hugegraph-llm/src/hugegraph_llm/operators/kg_construction_task.py
b/hugegraph-llm/src/hugegraph_llm/operators/kg_construction_task.py
index 2fb5966..a736751 100644
--- a/hugegraph-llm/src/hugegraph_llm/operators/kg_construction_task.py
+++ b/hugegraph-llm/src/hugegraph_llm/operators/kg_construction_task.py
@@ -31,7 +31,7 @@ from hugegraph_llm.operators.index_op.build_vector_index
import BuildVectorIndex
from hugegraph_llm.operators.llm_op.disambiguate_data import DisambiguateData
from hugegraph_llm.operators.llm_op.info_extract import InfoExtract
from hugegraph_llm.operators.llm_op.property_graph_extract import
PropertyGraphExtract
-from hugegraph_llm.utils.decorators import log_time, log_operator_time,
record_qps
+from hugegraph_llm.utils.decorators import log_time, log_operator_time,
record_rpm
from pyhugegraph.client import PyHugeClient
@@ -97,7 +97,7 @@ class KgBuilder:
return self
@log_time("total time")
- @record_qps
+ @record_rpm
def run(self, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
for operator in self.operators:
context = self._run_operator(operator, context)
diff --git a/hugegraph-llm/src/hugegraph_llm/utils/decorators.py
b/hugegraph-llm/src/hugegraph_llm/utils/decorators.py
index 7ffda08..b07de6f 100644
--- a/hugegraph-llm/src/hugegraph_llm/utils/decorators.py
+++ b/hugegraph-llm/src/hugegraph_llm/utils/decorators.py
@@ -15,8 +15,8 @@
# specific language governing permissions and limitations
# under the License.
-import time
import asyncio
+import time
from functools import wraps
from typing import Optional, Any, Callable
@@ -74,22 +74,37 @@ def log_operator_time(func: Callable) -> Callable:
log.debug("Operator %s finished in %.2f seconds",
operator.__class__.__name__, op_time)
# log.debug("Current context:\n%s", result)
return result
+
return wrapper
-def record_qps(func: Callable) -> Callable:
+def record_rpm(func: Callable) -> Callable:
@wraps(func)
- def wrapper(*args: Any, **kwargs: Any) -> Any:
+ async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
+ start = time.perf_counter()
+ result = await func(*args, **kwargs)
+ call_count = result.get("call_count", 0)
+ elapsed_time = time.perf_counter() - start
+ rpm = (call_count / elapsed_time * 60) if elapsed_time > 0 else 0
+ if rpm >= 1:
+ log.debug("%s RPM: %.1f/min", args[0].__class__.__name__, rpm)
+ return result
+
+ @wraps(func)
+ def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
start = time.perf_counter()
result = func(*args, **kwargs)
call_count = result.get("call_count", 0)
- qps = call_count / (time.perf_counter() - start)
- if qps >= 0.10:
- log.debug("%s QPS: %.2f/s", args[0].__class__.__name__, qps)
- else:
- log.debug("%s QPS: %f/s", args[0].__class__.__name__, qps)
+ elapsed_time = time.perf_counter() - start
+ rpm = (call_count / elapsed_time * 60) if elapsed_time > 0 else 0
+ if rpm >= 1:
+ log.debug("%s RPM: %.1f/min", args[0].__class__.__name__, rpm)
return result
- return wrapper
+
+ if asyncio.iscoroutinefunction(func):
+ return async_wrapper
+ return sync_wrapper
+
def with_task_id(func: Callable) -> Callable:
def wrapper(*args: Any, **kwargs: Any) -> Any:
@@ -99,11 +114,10 @@ def with_task_id(func: Callable) -> Callable:
# Store the original return value
result = func(*args, **kwargs)
-
# Add the task_id to the function's context
if hasattr(result, "__closure__") and result.__closure__:
# If it's a closure, we can add the task_id to its context
setattr(result, "task_id", task_id)
-
return result
+
return wrapper