wenjin272 commented on code in PR #81:
URL: https://github.com/apache/flink-agents/pull/81#discussion_r2247967712
##########
python/flink_agents/plan/action.py:
##########
@@ -41,40 +49,109 @@ class Action(BaseModel):
"""
name: str
- #TODO: Raise a warning when the action has a return value, as it will be
ignored.
+ # TODO: Raise a warning when the action has a return value, as it will be
ignored.
exec: Function
listen_event_types: List[str]
- @field_serializer('exec')
+ @field_serializer("exec")
def __serialize_exec(self, exec: Function) -> dict:
# append meta info to help deserialize exec
data = exec.model_dump()
- data['func_type'] = exec.__class__.__qualname__
+ data["func_type"] = exec.__class__.__qualname__
return data
- @model_validator(mode='before')
- def __custom_deserialize(self) -> 'Action':
- exec = self['exec']
+ @model_validator(mode="before")
+ def __custom_deserialize(self) -> "Action":
+ exec = self["exec"]
# restore exec from serialized json.
if isinstance(exec, dict):
- func_type = exec['func_type']
- if func_type == 'PythonFunction':
- self['exec'] = PythonFunction(**exec)
- elif func_type == 'JavaFunction':
- self['exec'] = JavaFunction(**exec)
+ func_type = exec["func_type"]
+ if func_type == "PythonFunction":
+ self["exec"] = PythonFunction(**exec)
+ elif func_type == "JavaFunction":
+ self["exec"] = JavaFunction(**exec)
else:
- err_msg = f'Unknown function type: {func_type}'
+ err_msg = f"Unknown function type: {func_type}"
raise NotImplementedError(err_msg)
return self
def __init__(
- self,
- name: str,
- exec: Function,
- listen_event_types: List[str],
+ self,
+ name: str,
+ exec: Function,
+ listen_event_types: List[str],
) -> None:
"""Action will check function signature when init."""
super().__init__(name=name, exec=exec,
listen_event_types=listen_event_types)
- #TODO: Update expected signature after import State and Context.
+ # TODO: Update expected signature after import State and Context.
self.exec.check_signature(Event, RunnerContext)
+
+def process_chat_request(event: ChatRequestEvent, ctx: RunnerContext) -> None:
+ """Built-in action for processing a chat request."""
+ chat_model = ctx.get_resource(event.model, ResourceType.CHAT_MODEL)
+ response = chat_model.chat(event.messages)
+ # call tool
+ if len(response.tool_calls) > 0:
+ for tool_call in response.tool_calls:
+ # store the tool call context in short term memory
+ state = ctx.get_short_term_memory()
+ if not state.is_exist("__tool_context"):
+ state.set("__tool_context", {})
+ tool_context = state.get("__tool_context")
+ tool_call_id = tool_call["id"]
+ tool_context[tool_call_id] = event
+ tool_context[tool_call_id].messages.append(response)
+ ctx.send_event(
+ ToolRequestEvent(
+ id=tool_call_id,
+ tool=tool_call["function"]["name"],
+ kwargs=tool_call["function"]["arguments"],
+ )
+ )
+
+ # send response
+ else:
+ ctx.send_event(ChatResponseEvent(request=event, response=response))
+
+
+def process_tool_request(event: ToolRequestEvent, ctx: RunnerContext) -> None:
+ """Built-in action for processing a tool call request."""
+ tool = ctx.get_resource(event.tool, ResourceType.TOOL)
+ response = tool.call(**event.kwargs)
+ ctx.send_event(ToolResponseEvent(request=event, response=response))
+
+
+def process_tool_response(event: ToolResponseEvent, ctx: RunnerContext) ->
None:
+ """Built-in action for processing a tool call response."""
+ state = ctx.get_short_term_memory()
+
+ if state.is_exist("__tool_context"):
+ tool_context = state.get("__tool_context")
+ tool_call_id = event.request.id
+ if tool_call_id in tool_context:
+ # get the tool call context from short term memory
+ tool_context = tool_context[tool_call_id]
+ tool_context.messages.append(
+ ChatMessage(role=MessageRole.TOOL, content=str(event.response))
+ )
+ ctx.send_event(tool_context)
+
+
+PROCESS_CHAT_REQUEST = Action(
+ name="process_chat_request",
+ exec=PythonFunction.from_callable(process_chat_request),
+
listen_event_types=[f"{ChatRequestEvent.__module__}.{ChatRequestEvent.__name__}"],
+)
+PROCESS_TOOL_REQUEST = Action(
+ name="process_tool_request",
+ exec=PythonFunction.from_callable(process_tool_request),
+
listen_event_types=[f"{ToolRequestEvent.__module__}.{ToolRequestEvent.__name__}"],
+)
+PROCESS_TOOL_RESPONSE = Action(
+ name="process_tool_response",
+ exec=PythonFunction.from_callable(process_tool_response),
+
listen_event_types=[f"{ToolResponseEvent.__module__}.{ToolResponseEvent.__name__}"],
+)
+
+BUILT_IN_ACTIONS = [PROCESS_CHAT_REQUEST, PROCESS_TOOL_REQUEST,
PROCESS_TOOL_RESPONSE]
Review Comment:
Make sense. But ChatRequestEvent/ChatResponseEvent should be in api module,
for user can sent or listen them in user defined action, while ChatAction
should in plan module. So I organize builtin actions in plan module by
functionality and organize events in api module by functionality.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]