This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
     new ddf413b3c9 update
ddf413b3c9 is described below

commit ddf413b3c974ef61832f839c304279098e3513cf
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Mar 22 00:53:10 2026 -0700

    update
---
 amber/src/main/python/core/models/operator.py                  | 10 +++++++++-
 amber/src/main/python/core/models/state.py                     |  2 +-
 .../apache/texera/amber/operator/loop/LoopStartOpDesc.scala    | 10 +---------
 3 files changed, 11 insertions(+), 11 deletions(-)

diff --git a/amber/src/main/python/core/models/operator.py 
b/amber/src/main/python/core/models/operator.py
index ea21438abd..f477ec0ae7 100644
--- a/amber/src/main/python/core/models/operator.py
+++ b/amber/src/main/python/core/models/operator.py
@@ -303,10 +303,18 @@ class LoopStartOperator(TableOperator):
     def process_table(self, table: Table, port: int) -> 
Iterator[Optional[TableLike]]:
         yield
 
+    @overrides.final
+    def process_state(self, state: State, port: int) -> Optional[State]:
+        state_dict = state.to_dict()
+        if "LoopStartStateURI" in state_dict:
+            state["loop_counter"] += 1
+            return state
+        self.state.update(state_dict)
+        return None
+
     @overrides.final
     def produce_state_on_finish(self, port: int) -> State:
         from pickle import dumps
-
         self.state["table"] = 
dumps(Table(self._TableOperator__table_data[port]))
         return State().from_dict(self.state)
 
diff --git a/amber/src/main/python/core/models/state.py 
b/amber/src/main/python/core/models/state.py
index 2036065483..18e2ed089f 100644
--- a/amber/src/main/python/core/models/state.py
+++ b/amber/src/main/python/core/models/state.py
@@ -68,7 +68,7 @@ class State:
         )
 
     def to_dict(self) -> dict:
-        dictionary = self.__dict__
+        dictionary = self.__dict__.copy()
         del dictionary["passToAllDownstream"]
         del dictionary["schema"]
         return dictionary
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala
index fe59907b25..baf1f4f409 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala
@@ -75,18 +75,10 @@ class LoopStartOpDesc extends LogicalOp {
        |class ProcessLoopStartOperator(LoopStartOperator):
        |    @overrides
        |    def open(self):
-       |        self.state = {}
+       |        self.state = {"loop_counter": 0}
        |        exec("$initialization", {}, self.state)
        |
        |    @overrides
-       |    def process_state(self, state: State, port: int) -> 
Optional[State]:
-       |        state_dict = state.to_dict()
-       |        if "LoopStartStateURI" in state_dict:
-       |            return state_dict
-       |        self.state.update(state_dict)
-       |        return None
-       |
-       |    @overrides
        |    def process_table(self, table: Table, port: int) -> 
Iterator[Optional[TableLike]]:
        |        self.state["table"] = table
        |        exec("output = $output", {}, self.state)

Reply via email to