This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
new f3b99e0dcf update
f3b99e0dcf is described below
commit f3b99e0dcfb29665f504e0baf6f521bbfef256d0
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Mar 26 01:35:00 2026 -0700
update
---
amber/src/main/python/core/models/state.py | 9 +++++----
.../org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala | 2 +-
2 files changed, 6 insertions(+), 5 deletions(-)
diff --git a/amber/src/main/python/core/models/state.py
b/amber/src/main/python/core/models/state.py
index 18e2ed089f..75ebd2001a 100644
--- a/amber/src/main/python/core/models/state.py
+++ b/amber/src/main/python/core/models/state.py
@@ -52,11 +52,12 @@ class State:
def add(
self, key: str, value: any, value_type: Optional[AttributeType] = None
) -> None:
+ if key not in self.__dict__:
+ if value_type is not None:
+ self.schema.add(key, value_type)
+ elif key != "schema":
+ self.schema.add(key, FROM_PYOBJECT_MAPPING[type(value)])
self.__dict__[key] = value
- if value_type is not None:
- self.schema.add(key, value_type)
- elif key != "schema":
- self.schema.add(key, FROM_PYOBJECT_MAPPING[type(value)])
def get(self, key: str) -> any:
return self.__dict__[key]
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala
index 045aa87cba..42d455b61e 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala
@@ -75,8 +75,8 @@ class LoopEndOpDesc extends LogicalOp {
|class ProcessLoopEndOperator(LoopEndOperator):
| @overrides
| def process_state(self, state: State, port: int) ->
Optional[State]:
- | from pickle import loads
| self.state = state.to_dict()
+ | from pickle import loads
| self.state["table"] = loads(self.state["table"])
| exec("$update", {}, self.state)
| return None