xintongsong commented on code in PR #138:
URL: https://github.com/apache/flink-agents/pull/138#discussion_r2336508433
##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java:
##########
@@ -138,6 +145,7 @@ public ActionExecutionOperator(
this.mailboxExecutor = mailboxExecutor;
this.eventLogger =
EventLoggerFactory.createLogger(EventLoggerConfig.builder().build());
this.eventListeners = new ArrayList<>();
+ this.actionStateStore = new KafkaActionStateStore();
Review Comment:
It might be better making this configurable. Although Kafka-based store is
the only option we currently provide, users should have another option to not
providing an external kafka service and disable this feature.
##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java:
##########
@@ -301,20 +309,47 @@ private void processActionTaskForKey(Object key) throws
Exception {
// 2. Invoke the action task.
createAndSetRunnerContext(actionTask);
- ActionTask.ActionTaskResult actionTaskResult = actionTask.invoke();
- for (Event actionOutputEvent : actionTaskResult.getOutputEvents()) {
+
+ boolean isFinished = false;
+ List<Event> outputEvents;
+ Optional<ActionTask> generatedActionTaskOpt;
+ ActionState actionState = actionStateStore.get(key, actionTask.action);
Review Comment:
The key and action do not uniquely identify an action task. An action can be
triggered multiple times, by different events. I think we should add the event
id as part of the identifier.
##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java:
##########
@@ -301,20 +309,47 @@ private void processActionTaskForKey(Object key) throws
Exception {
// 2. Invoke the action task.
createAndSetRunnerContext(actionTask);
- ActionTask.ActionTaskResult actionTaskResult = actionTask.invoke();
- for (Event actionOutputEvent : actionTaskResult.getOutputEvents()) {
+
+ boolean isFinished = false;
+ List<Event> outputEvents;
+ Optional<ActionTask> generatedActionTaskOpt;
+ ActionState actionState = actionStateStore.get(key, actionTask.action);
+ if (actionState != null) {
+ isFinished = actionState.getGeneratedActionTask().isPresent();
Review Comment:
`isFinished` should be `false` when generated action task is present.
##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java:
##########
@@ -301,20 +309,47 @@ private void processActionTaskForKey(Object key) throws
Exception {
// 2. Invoke the action task.
createAndSetRunnerContext(actionTask);
- ActionTask.ActionTaskResult actionTaskResult = actionTask.invoke();
- for (Event actionOutputEvent : actionTaskResult.getOutputEvents()) {
+
+ boolean isFinished = false;
+ List<Event> outputEvents;
+ Optional<ActionTask> generatedActionTaskOpt;
+ ActionState actionState = actionStateStore.get(key, actionTask.action);
+ if (actionState != null) {
+ isFinished = actionState.getGeneratedActionTask().isPresent();
+ outputEvents = actionState.getOutputEvents();
+ generatedActionTaskOpt = actionState.getGeneratedActionTask();
+ for (MemoryUpdate memoryUpdate : actionState.getMemoryUpdates()) {
+ actionTask
+ .getRunnerContext()
+ .getShortTermMemory()
+ .set(memoryUpdate.getPath(), memoryUpdate.getValue());
+ }
+ } else {
Review Comment:
In python async execution, if an action task is invoked but not finished, it
will be put back to `actionTasksKState` and re-invoked later. In that case, it
will falls into the `if (actionState != null)` clause, and treated as replay
rather than resume.
##########
runtime/src/main/java/org/apache/flink/agents/runtime/memory/MemoryObjectImpl.java:
##########
@@ -35,6 +36,7 @@ private enum ItemType {
private static final String SEPARATOR = ".";
private final MapState<String, MemoryItem> store;
+ private final List<MemoryUpdate> memoryUpdates;
Review Comment:
I'd suggest to store the memory updates in the runner context. We can pass
it in when creating the memory object.
1. `MemoryObject` is an user-facing API. We should at least keep
`getAllUpdates()` as part of the internal implementation, so that user don't
have the chance to mess-up with it.
2. `MemoryObject` can be hierachical. User may call `set()` on child memory
objects. So calling `getAllUpdates()` for persistence only on the root memory
object is not enough.
3. Conceptually, it's more straightforward that memory object only keeps the
content that should be memorized, and all execution contextual information is
maintained in the runner context.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]