This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
new ddf413b3c9 update
ddf413b3c9 is described below
commit ddf413b3c974ef61832f839c304279098e3513cf
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Mar 22 00:53:10 2026 -0700
update
---
amber/src/main/python/core/models/operator.py | 10 +++++++++-
amber/src/main/python/core/models/state.py | 2 +-
.../apache/texera/amber/operator/loop/LoopStartOpDesc.scala | 10 +---------
3 files changed, 11 insertions(+), 11 deletions(-)
diff --git a/amber/src/main/python/core/models/operator.py
b/amber/src/main/python/core/models/operator.py
index ea21438abd..f477ec0ae7 100644
--- a/amber/src/main/python/core/models/operator.py
+++ b/amber/src/main/python/core/models/operator.py
@@ -303,10 +303,18 @@ class LoopStartOperator(TableOperator):
def process_table(self, table: Table, port: int) ->
Iterator[Optional[TableLike]]:
yield
+ @overrides.final
+ def process_state(self, state: State, port: int) -> Optional[State]:
+ state_dict = state.to_dict()
+ if "LoopStartStateURI" in state_dict:
+ state["loop_counter"] += 1
+ return state
+ self.state.update(state_dict)
+ return None
+
@overrides.final
def produce_state_on_finish(self, port: int) -> State:
from pickle import dumps
-
self.state["table"] =
dumps(Table(self._TableOperator__table_data[port]))
return State().from_dict(self.state)
diff --git a/amber/src/main/python/core/models/state.py
b/amber/src/main/python/core/models/state.py
index 2036065483..18e2ed089f 100644
--- a/amber/src/main/python/core/models/state.py
+++ b/amber/src/main/python/core/models/state.py
@@ -68,7 +68,7 @@ class State:
)
def to_dict(self) -> dict:
- dictionary = self.__dict__
+ dictionary = self.__dict__.copy()
del dictionary["passToAllDownstream"]
del dictionary["schema"]
return dictionary
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala
index fe59907b25..baf1f4f409 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala
@@ -75,18 +75,10 @@ class LoopStartOpDesc extends LogicalOp {
|class ProcessLoopStartOperator(LoopStartOperator):
| @overrides
| def open(self):
- | self.state = {}
+ | self.state = {"loop_counter": 0}
| exec("$initialization", {}, self.state)
|
| @overrides
- | def process_state(self, state: State, port: int) ->
Optional[State]:
- | state_dict = state.to_dict()
- | if "LoopStartStateURI" in state_dict:
- | return state_dict
- | self.state.update(state_dict)
- | return None
- |
- | @overrides
| def process_table(self, table: Table, port: int) ->
Iterator[Optional[TableLike]]:
| self.state["table"] = table
| exec("output = $output", {}, self.state)