xintongsong commented on code in PR #32:
URL: https://github.com/apache/flink-agents/pull/32#discussion_r2160210056
##########
python/flink_agents/api/workflow.py:
##########
@@ -0,0 +1,57 @@
+import importlib
+from collections import deque
+from typing import Any, Dict, final
+from uuid import UUID
+
+from flink_agents.api.workflow_runner import WorkflowRunner
+
+
+class Workflow:
+ """Base class for defining workflow logic.
+
+ Attributes:
+ ----------
+ __runner : WorkflowRunner
+ Internal workflow runner instance used to execute the workflow.
+ """
+
+ __runner: WorkflowRunner = None
Review Comment:
The workflow should not be aware of the runner.
##########
python/flink_agents/api/workflow_runner.py:
##########
@@ -0,0 +1,40 @@
+from abc import ABC, abstractmethod
+from collections import deque
+from typing import Any, Dict
+from uuid import UUID
+
+
+class WorkflowRunner(ABC):
Review Comment:
The workflow runner should not be a public API. We don't want users to call
it or implement it.
##########
python/flink_agents/examples/workflow_example.py:
##########
@@ -0,0 +1,31 @@
+from typing import Any
+
+from flink_agents.api.decorators import action
+from flink_agents.api.event import Event, InputEvent, OutputEvent
+from flink_agents.api.workflow import Workflow
+from flink_agents.api.workflow_runner_context import WorkflowRunnerContext
+
+
+class MyEvent(Event): #noqa D101
+ value: Any
+
+class MyWorkflow(Workflow): #noqa D101
+ @action(InputEvent)
+ @staticmethod
+ def first_action(event: Event, ctx: WorkflowRunnerContext): #noqa D102
+ event.input += " first_action"
Review Comment:
This is not a good example. Events received by an action should be
immutable, and we should not encourage users to modify them.
##########
python/flink_agents/api/workflow_runner_context.py:
##########
@@ -0,0 +1,47 @@
+from abc import ABC, abstractmethod
+from collections import deque
+from typing import Any
+
+from flink_agents.api.event import Event
+
+
+class WorkflowRunnerContext(ABC):
Review Comment:
Let's just call this `RunnerContext`.
##########
python/flink_agents/examples/workflow_example.py:
##########
@@ -0,0 +1,31 @@
+from typing import Any
+
+from flink_agents.api.decorators import action
+from flink_agents.api.event import Event, InputEvent, OutputEvent
+from flink_agents.api.workflow import Workflow
+from flink_agents.api.workflow_runner_context import WorkflowRunnerContext
+
+
+class MyEvent(Event): #noqa D101
+ value: Any
+
+class MyWorkflow(Workflow): #noqa D101
+ @action(InputEvent)
+ @staticmethod
+ def first_action(event: Event, ctx: WorkflowRunnerContext): #noqa D102
+ event.input += " first_action"
+ ctx.send_event(MyEvent(value=event.input))
+ ctx.send_event(OutputEvent(output=event.input))
+
+ @action(MyEvent)
+ @staticmethod
+ def second_action(event: Event, ctx: WorkflowRunnerContext): #noqa D102
+ event.value += " second_action"
+ ctx.send_event(OutputEvent(output=event.value))
+
+
+if __name__ == "__main__":
+ workflow = MyWorkflow()
+ session_id = workflow.run(input="input", runner='LocalRunner')
+ for output in workflow.get_outputs(session_id):
+ print(output)
Review Comment:
1. We should not have `run` and `get_outputs` on a workflow, because they
only make sense in local runner. It would be problematic if user calls them
when executing in Flink runner.
2. We probably should replace the concept of `session_id` with `key`.
3. If `workflow.run` is only for local runner, we should not require user to
specify the `runner='LocalRunner'`
##########
python/flink_agents/api/decorators.py:
##########
@@ -0,0 +1,31 @@
+from typing import Callable, Tuple, Type
+
+from flink_agents.api.event import Event
+
+
+#TODO: implement Closure to support access self in action function, like
llama-index
Review Comment:
What does this mean?
##########
python/flink_agents/plan/workflow_plan_utils.py:
##########
@@ -0,0 +1,27 @@
+from typing import List
+
+from flink_agents.api.workflow import Workflow
+from flink_agents.plan.action import Action
+from flink_agents.plan.function import PythonFunction
+
+
+def get_actions(workflow: Workflow) -> List[Action]:
Review Comment:
Why introducing this util file?
##########
python/flink_agents/examples/workflow_example.py:
##########
@@ -0,0 +1,31 @@
+from typing import Any
+
+from flink_agents.api.decorators import action
+from flink_agents.api.event import Event, InputEvent, OutputEvent
+from flink_agents.api.workflow import Workflow
+from flink_agents.api.workflow_runner_context import WorkflowRunnerContext
+
+
+class MyEvent(Event): #noqa D101
+ value: Any
+
+class MyWorkflow(Workflow): #noqa D101
+ @action(InputEvent)
+ @staticmethod
Review Comment:
Is it possible to get rid of this `@staticmethod`?
##########
python/flink_agents/api/workflow_runner_context.py:
##########
@@ -0,0 +1,47 @@
+from abc import ABC, abstractmethod
+from collections import deque
+from typing import Any
+
+from flink_agents.api.event import Event
+
+
+class WorkflowRunnerContext(ABC):
+ """Abstract base class providing context for workflow execution.
+
+ This context provides access to event handling, session management,
+ and external resources.
+ """
+
+ @abstractmethod
+ def send_event(self, event: Event) -> None:
+ """Send an event to the workflow for processing.
+
+ Parameters
+ ----------
+ event : Event
+ The event to be processed by the workflow system.
+ """
+
+ @abstractmethod
+ def add_output(self, output: Any) -> None:
+ """Add an output generate by workflow execution to the context.
+
+ Parameters
+ ----------
+ output : Any
+ The output to be added to the queue.
+ """
+
+ @abstractmethod
+ def get_outputs(self) -> deque[Any]:
+ """Get outputs stored in this context.
+
+ Returns:
+ -------
+ deque[Any]
+ The outputs generated by workflow execution on this context.
+ """
+
+ @abstractmethod
+ def clear_output(self) -> None:
+ """Clear outputs stored in this context."""
Review Comment:
Why do we need these in the user-facing API?
##########
python/flink_agents/examples/workflow_example.py:
##########
@@ -0,0 +1,31 @@
+from typing import Any
+
+from flink_agents.api.decorators import action
+from flink_agents.api.event import Event, InputEvent, OutputEvent
+from flink_agents.api.workflow import Workflow
+from flink_agents.api.workflow_runner_context import WorkflowRunnerContext
+
+
+class MyEvent(Event): #noqa D101
+ value: Any
+
+class MyWorkflow(Workflow): #noqa D101
+ @action(InputEvent)
+ @staticmethod
+ def first_action(event: Event, ctx: WorkflowRunnerContext): #noqa D102
+ event.input += " first_action"
+ ctx.send_event(MyEvent(value=event.input))
+ ctx.send_event(OutputEvent(output=event.input))
+
+ @action(MyEvent)
+ @staticmethod
+ def second_action(event: Event, ctx: WorkflowRunnerContext): #noqa D102
+ event.value += " second_action"
+ ctx.send_event(OutputEvent(output=event.value))
+
+
+if __name__ == "__main__":
+ workflow = MyWorkflow()
+ session_id = workflow.run(input="input", runner='LocalRunner')
+ for output in workflow.get_outputs(session_id):
+ print(output)
Review Comment:
I'd suggest something like:
```
in_q = Queue()
out_q = (FlinkAgentsExecutionEnvironment
.from_queue(in_q)
.apply(workflow)
.to_queue()
)
in_q.put(("intput1", "key"))
in_q.put("input2") # automatically generate new unique key
while not out_q.empty():
(key, output) = out_q.get()
print(key, output)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]