xintongsong commented on code in PR #32:
URL: https://github.com/apache/flink-agents/pull/32#discussion_r2161337602


##########
python/flink_agents/examples/workflow_example.py:
##########
@@ -0,0 +1,66 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import time
+from collections import deque
+from typing import Any
+
+from flink_agents.api.decorators import action
+from flink_agents.api.event import Event, InputEvent, OutputEvent
+from flink_agents.api.execution_enviroment import AgentsExecutionEnvironment
+from flink_agents.api.runner_context import RunnerContext
+from flink_agents.api.workflow import Workflow
+
+
+class MyEvent(Event): #noqa D101
+    value: Any
+
+class MyWorkflow(Workflow): #noqa D101

Review Comment:
   The story of this example doesn't really make sense. IIUC, this is more like 
a temporal example for developing and testing purpose. We should explain this 
in the docstring, and add a todo to remove it or replace it with something 
meanningful.



##########
python/flink_agents/examples/workflow_example.py:
##########
@@ -0,0 +1,66 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import time
+from collections import deque
+from typing import Any
+
+from flink_agents.api.decorators import action
+from flink_agents.api.event import Event, InputEvent, OutputEvent
+from flink_agents.api.execution_enviroment import AgentsExecutionEnvironment
+from flink_agents.api.runner_context import RunnerContext
+from flink_agents.api.workflow import Workflow
+
+
+class MyEvent(Event): #noqa D101
+    value: Any
+
+class MyWorkflow(Workflow): #noqa D101
+    @action(InputEvent)
+    @staticmethod
+    def first_action(event: Event, ctx: RunnerContext): #noqa D102
+        input = event.input
+        content = input + ' first_action'
+        ctx.send_event(MyEvent(value=content))
+        ctx.send_event(OutputEvent(output=content))
+
+    @action(MyEvent)
+    @staticmethod
+    def second_action(event: Event, ctx: RunnerContext): #noqa D102
+        input = event.value
+        content = input + ' second_action'
+        ctx.send_event(OutputEvent(output=content))
+
+
+if __name__ == "__main__":
+    env = AgentsExecutionEnvironment.get_execution_environment(module=__name__)
+
+    input_queue = deque()
+    workflow = MyWorkflow()
+
+    output_queue = env.from_input(input_queue).apply(workflow).to_output()
+
+    env.execute()
+
+    input_queue.append({'key': 'bob', 'value': 'The message from bob'})
+    input_queue.append({'key': 'john', 'value': 'The message from john'})
+    input_queue.append({'value': 'The message from unknow'}) # will 
automatically generate a new unique key
+    input_queue.append({'finish': True}) # mark source finish
+
+    time.sleep(1) # wait a second to get outputs

Review Comment:
   It's a bit too complex to have this `finish` and sleep things.
   
   I think we can simplify this by adding all the inputs into the input queue 
before executing the workflow, and make `env.execute()` a blocking call that 
finishes after consuming all the inputs and emitting all the outputs. In that 
case, we can also replace the queues with lists.



##########
python/flink_agents/examples/workflow_example.py:
##########
@@ -0,0 +1,66 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import time
+from collections import deque
+from typing import Any
+
+from flink_agents.api.decorators import action
+from flink_agents.api.event import Event, InputEvent, OutputEvent
+from flink_agents.api.execution_enviroment import AgentsExecutionEnvironment
+from flink_agents.api.runner_context import RunnerContext
+from flink_agents.api.workflow import Workflow
+
+
+class MyEvent(Event): #noqa D101
+    value: Any
+
+class MyWorkflow(Workflow): #noqa D101
+    @action(InputEvent)
+    @staticmethod
+    def first_action(event: Event, ctx: RunnerContext): #noqa D102
+        input = event.input
+        content = input + ' first_action'
+        ctx.send_event(MyEvent(value=content))
+        ctx.send_event(OutputEvent(output=content))
+
+    @action(MyEvent)
+    @staticmethod
+    def second_action(event: Event, ctx: RunnerContext): #noqa D102
+        input = event.value
+        content = input + ' second_action'
+        ctx.send_event(OutputEvent(output=content))
+
+
+if __name__ == "__main__":
+    env = AgentsExecutionEnvironment.get_execution_environment(module=__name__)
+
+    input_queue = deque()
+    workflow = MyWorkflow()
+
+    output_queue = env.from_input(input_queue).apply(workflow).to_output()
+
+    env.execute()
+
+    input_queue.append({'key': 'bob', 'value': 'The message from bob'})
+    input_queue.append({'key': 'john', 'value': 'The message from john'})
+    input_queue.append({'value': 'The message from unknow'}) # will 
automatically generate a new unique key

Review Comment:
   typo 'unknown'



##########
python/flink_agents/api/workflow.py:
##########
@@ -0,0 +1,19 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+class Workflow:

Review Comment:
   Abstract class.



##########
python/flink_agents/examples/workflow_example.py:
##########
@@ -0,0 +1,66 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import time
+from collections import deque
+from typing import Any
+
+from flink_agents.api.decorators import action
+from flink_agents.api.event import Event, InputEvent, OutputEvent
+from flink_agents.api.execution_enviroment import AgentsExecutionEnvironment
+from flink_agents.api.runner_context import RunnerContext
+from flink_agents.api.workflow import Workflow
+
+
+class MyEvent(Event): #noqa D101
+    value: Any
+
+class MyWorkflow(Workflow): #noqa D101
+    @action(InputEvent)
+    @staticmethod
+    def first_action(event: Event, ctx: RunnerContext): #noqa D102
+        input = event.input
+        content = input + ' first_action'
+        ctx.send_event(MyEvent(value=content))
+        ctx.send_event(OutputEvent(output=content))
+
+    @action(MyEvent)
+    @staticmethod
+    def second_action(event: Event, ctx: RunnerContext): #noqa D102
+        input = event.value
+        content = input + ' second_action'
+        ctx.send_event(OutputEvent(output=content))
+
+
+if __name__ == "__main__":
+    env = AgentsExecutionEnvironment.get_execution_environment(module=__name__)
+
+    input_queue = deque()
+    workflow = MyWorkflow()
+
+    output_queue = env.from_input(input_queue).apply(workflow).to_output()
+
+    env.execute()
+
+    input_queue.append({'key': 'bob', 'value': 'The message from bob'})

Review Comment:
   Dict is a bit complex here, as users needs to use the correct keys. I'd 
suggest to use tuple.



##########
python/flink_agents/runtime/local_runner.py:
##########
@@ -0,0 +1,198 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import logging
+import uuid
+from collections import deque
+from typing import Any, Dict, Optional
+from uuid import UUID
+
+from typing_extensions import override
+
+from flink_agents.api.event import Event, InputEvent, OutputEvent
+from flink_agents.api.runner_context import RunnerContext
+from flink_agents.api.workflow import Workflow
+from flink_agents.plan.workflow_plan import WorkflowPlan
+from flink_agents.runtime.workflow_runner import WorkflowRunner
+
+logging.basicConfig(
+    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
+)
+logger = logging.getLogger(__name__)
+
+
+class LocalRunnerContext(RunnerContext):
+    """Implementation of WorkflowRunnerContext for local workflow execution.
+
+    Attributes:
+    ----------
+    __workflow_plan : WorkflowPlan
+        Internal workflow plan for this context.
+    __id : UUID
+        Unique identifier for the context.
+    events : deque[Event]
+        Queue of events to be processed in this context.
+    outputs : deque[Any]
+        Queue of outputs generated by workflow execution.
+    """
+
+    __workflow_plan: WorkflowPlan
+    __id: UUID
+    events: deque[Event]
+    outputs: deque[Any]
+
+    def __init__(self, workflow_plan: WorkflowPlan, id: UUID) -> None:
+        """Initialize a new session with the given workflow and ID.
+
+        Parameters
+        ----------
+        workflow : Workflow
+            Workflow plan used for this context.
+        id : UUID
+            Unique context identifier. If None, a new UUID will be generated.
+        """
+        self.__workflow_plan = workflow_plan
+        if id is None:
+            id = uuid.uuid4()
+        self.__id = id
+        self.events = deque()
+        self.outputs = deque()
+
+    @property
+    def session_id(self) -> UUID:

Review Comment:
   ```suggestion
       def key(self) -> Any:
   ```



##########
python/flink_agents/api/tests/test_decorators.py:
##########
@@ -0,0 +1,32 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from flink_agents.api.decorators import action
+from flink_agents.api.event import Event, InputEvent, OutputEvent
+from flink_agents.api.runner_context import RunnerContext
+
+
+def test_action_decorator() -> None: #noqa D103

Review Comment:
   I think we need more test cases, for:
   - One action listens to multiple events
   - Negative cases for illegal signature
   - Negative cases for listening to non-event types



##########
python/flink_agents/examples/workflow_example.py:
##########
@@ -0,0 +1,66 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import time
+from collections import deque
+from typing import Any
+
+from flink_agents.api.decorators import action
+from flink_agents.api.event import Event, InputEvent, OutputEvent
+from flink_agents.api.execution_enviroment import AgentsExecutionEnvironment
+from flink_agents.api.runner_context import RunnerContext
+from flink_agents.api.workflow import Workflow
+
+
+class MyEvent(Event): #noqa D101
+    value: Any
+
+class MyWorkflow(Workflow): #noqa D101
+    @action(InputEvent)
+    @staticmethod
+    def first_action(event: Event, ctx: RunnerContext): #noqa D102
+        input = event.input
+        content = input + ' first_action'
+        ctx.send_event(MyEvent(value=content))
+        ctx.send_event(OutputEvent(output=content))
+
+    @action(MyEvent)
+    @staticmethod
+    def second_action(event: Event, ctx: RunnerContext): #noqa D102
+        input = event.value
+        content = input + ' second_action'
+        ctx.send_event(OutputEvent(output=content))
+
+
+if __name__ == "__main__":
+    env = AgentsExecutionEnvironment.get_execution_environment(module=__name__)
+
+    input_queue = deque()
+    workflow = MyWorkflow()
+
+    output_queue = env.from_input(input_queue).apply(workflow).to_output()

Review Comment:
   ```suggestion
       output_queue = env.from_queue(input_queue).apply(workflow).to_queue()
   ```



##########
python/flink_agents/examples/workflow_example.py:
##########
@@ -0,0 +1,66 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import time
+from collections import deque
+from typing import Any
+
+from flink_agents.api.decorators import action
+from flink_agents.api.event import Event, InputEvent, OutputEvent
+from flink_agents.api.execution_enviroment import AgentsExecutionEnvironment
+from flink_agents.api.runner_context import RunnerContext
+from flink_agents.api.workflow import Workflow
+
+
+class MyEvent(Event): #noqa D101
+    value: Any
+
+class MyWorkflow(Workflow): #noqa D101
+    @action(InputEvent)
+    @staticmethod
+    def first_action(event: Event, ctx: RunnerContext): #noqa D102
+        input = event.input
+        content = input + ' first_action'
+        ctx.send_event(MyEvent(value=content))
+        ctx.send_event(OutputEvent(output=content))
+
+    @action(MyEvent)
+    @staticmethod
+    def second_action(event: Event, ctx: RunnerContext): #noqa D102
+        input = event.value
+        content = input + ' second_action'
+        ctx.send_event(OutputEvent(output=content))
+
+
+if __name__ == "__main__":
+    env = AgentsExecutionEnvironment.get_execution_environment(module=__name__)
+
+    input_queue = deque()
+    workflow = MyWorkflow()
+
+    output_queue = env.from_input(input_queue).apply(workflow).to_output()

Review Comment:
   Because later we will also support DataStream and Table as input and output.



##########
python/flink_agents/runtime/local_runner.py:
##########
@@ -0,0 +1,198 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import logging
+import uuid
+from collections import deque
+from typing import Any, Dict, Optional
+from uuid import UUID
+
+from typing_extensions import override
+
+from flink_agents.api.event import Event, InputEvent, OutputEvent
+from flink_agents.api.runner_context import RunnerContext
+from flink_agents.api.workflow import Workflow
+from flink_agents.plan.workflow_plan import WorkflowPlan
+from flink_agents.runtime.workflow_runner import WorkflowRunner
+
+logging.basicConfig(
+    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
+)
+logger = logging.getLogger(__name__)
+
+
+class LocalRunnerContext(RunnerContext):
+    """Implementation of WorkflowRunnerContext for local workflow execution.
+
+    Attributes:
+    ----------
+    __workflow_plan : WorkflowPlan
+        Internal workflow plan for this context.
+    __id : UUID
+        Unique identifier for the context.
+    events : deque[Event]
+        Queue of events to be processed in this context.
+    outputs : deque[Any]
+        Queue of outputs generated by workflow execution.
+    """
+
+    __workflow_plan: WorkflowPlan
+    __id: UUID

Review Comment:
   ```suggestion
       __key: Any
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to