This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
     new 0b55650d09 refactor(loop): dedup loop-operator eval/state helpers, 
drop redundant flag
0b55650d09 is described below

commit 0b55650d0986a13c8684ee0513194299b2e231b4
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sat Jun 13 15:37:11 2026 -0700

    refactor(loop): dedup loop-operator eval/state helpers, drop redundant flag
    
    A /simplify pass over the loop changes in operator.py:
    
    - Extract `_strip_reserved(state)` -- the `{k: v ... if k not in
      _RESERVED_STATE_KEYS}` comprehension was copy-pasted three times
      (produce_state_on_finish + twice in run_update).
    - Extract `_eval_loop_expr(expr, state, table)` -- LoopStart.eval_output and
      LoopEnd.eval_condition were near-identical "exec the expression as 
`output`
      in a throwaway namespace" evaluators.
    - Drop LoopEndOperator._consumed_state: it was redundant with _loop_table,
      which is None exactly until run_update consumes a matching state. 
condition()
      now short-circuits on `self._loop_table is None`.
    
    Behavior unchanged; test_loop_operators / test_main_loop / 
test_output_manager
    stay green (the one unit test that pinned the flag now pins the equivalent
    _loop_table state). Net fewer lines and no duplicated logic.
---
 amber/src/main/python/core/models/operator.py      | 77 ++++++++++------------
 .../test/python/core/models/test_loop_operators.py | 11 ++--
 2 files changed, 41 insertions(+), 47 deletions(-)

diff --git a/amber/src/main/python/core/models/operator.py 
b/amber/src/main/python/core/models/operator.py
index 5ec07e1164..153267aab2 100644
--- a/amber/src/main/python/core/models/operator.py
+++ b/amber/src/main/python/core/models/operator.py
@@ -314,6 +314,26 @@ class TableOperator(TupleOperatorV2):
 _RESERVED_STATE_KEYS: frozenset = frozenset({"table", "output"})
 
 
+def _strip_reserved(state: State) -> State:
+    """Return ``state`` without the runtime-reserved keys (``table`` / 
``output``)."""
+    return {
+        key: value for key, value in state.items() if key not in 
_RESERVED_STATE_KEYS
+    }
+
+
+def _eval_loop_expr(expr: str, state: State, table: Optional[Table]):
+    """Evaluate ``expr`` as ``output`` against the loop variables plus 
``table``.
+
+    Runs in a throwaway namespace seeded with the loop variables and ``table``
+    so those reserved names neither leak into nor are clobbered out of the
+    persistent loop ``state``. Shared by LoopStart's ``output`` expression and
+    LoopEnd's ``condition``.
+    """
+    namespace = {**state, "table": table}
+    exec("output = " + expr, {}, namespace)
+    return namespace["output"]
+
+
 class LoopStartOperator(TableOperator):
     """Base class for the runtime side of a Loop Start operator.
 
@@ -372,14 +392,7 @@ class LoopStartOperator(TableOperator):
 
     @overrides.final
     def eval_output(self, output_expr: str, table: Table) -> TableLike:
-        # Run the user's `output` expression in a throwaway namespace seeded
-        # with the loop variables and the input `table`. This lets user code
-        # read `table` and define `output` without those reserved names leaking
-        # into -- or being silently clobbered out of -- the persistent loop
-        # state (self.state), addressing the exec-namespace collision.
-        namespace = {**self.state, "table": table}
-        exec("output = " + output_expr, {}, namespace)
-        return namespace["output"]
+        return _eval_loop_expr(output_expr, self.state, table)
 
     @overrides.final
     def produce_state_on_finish(self, port: int) -> State:
@@ -387,15 +400,9 @@ class LoopStartOperator(TableOperator):
         # matching LoopEnd. The table rides as an Apache Arrow IPC stream, not
         # pickle bytes: the receiving LoopEnd would otherwise have to
         # `pickle.loads` data that lives in iceberg, a remote-code-execution
-        # surface. `table`/`output` are runtime-reserved and are not kept in
-        # self.state, so drop any stray ones before adding the real table.
-        # Reads the buffer through `_buffered_table` so a rename of
+        # surface. Reads the buffer through `_buffered_table` so a rename of
         # `TableOperator` doesn't silently break this.
-        produced = {
-            key: value
-            for key, value in self.state.items()
-            if key not in _RESERVED_STATE_KEYS
-        }
+        produced = _strip_reserved(self.state)
         produced["table"] = table_to_ipc_bytes(self._buffered_table(port))
         return produced
 
@@ -452,12 +459,11 @@ class LoopEndOperator(TableOperator):
         # that never consumed a matching state (an inner LoopEnd that only
         # forwarded outer-loop pass-through state, or a loop that completed
         # without a matching-branch consume). run_update is what populates
-        # self.state / self._loop_table, so initialize them -- plus a
-        # "consumed" flag eval_condition checks -- to avoid AttributeError and
-        # to terminate cleanly in that case.
+        # self.state / self._loop_table, so initialize them here to avoid
+        # AttributeError; a None _loop_table means "nothing consumed yet" and
+        # condition() short-circuits to False (see eval_condition).
         self.state: State = {}
         self._loop_table: Optional[Table] = None
-        self._consumed_state: bool = False
 
     @overrides.final
     def process_table(self, table: Table, port: int) -> 
Iterator[Optional[TableLike]]:
@@ -470,37 +476,24 @@ class LoopEndOperator(TableOperator):
         # user variables back into self.state. The table arrives as an Apache
         # Arrow IPC stream (see LoopStartOperator.produce_state_on_finish), so
         # it is decoded structurally rather than via pickle.loads -- no
-        # remote-code-execution surface. `table`/`output` are runtime-reserved
-        # and never persist, so user code cannot silently clobber loop
-        # machinery through them. The real input table is kept on
+        # remote-code-execution surface. The real input table is kept on
         # self._loop_table so condition() can read it after the update.
         table = table_from_ipc_bytes(state["table"])
-        namespace = {
-            key: value
-            for key, value in state.items()
-            if key not in _RESERVED_STATE_KEYS
-        }
+        namespace = _strip_reserved(state)
         namespace["table"] = table
         exec(update_code, {}, namespace)
         self._loop_table = table
-        self.state = {
-            key: value
-            for key, value in namespace.items()
-            if key not in _RESERVED_STATE_KEYS
-        }
-        self._consumed_state = True
+        self.state = _strip_reserved(namespace)
 
     @overrides.final
     def eval_condition(self, condition_expr: str) -> bool:
-        # No matching state was consumed (run_update never ran): the loop never
-        # iterated here, so do not continue. Returning False also avoids
-        # evaluating the user's condition against loop variables that don't
-        # exist yet (which would raise NameError).
-        if not self._consumed_state:
+        # No matching state was consumed (run_update never ran, so _loop_table
+        # is still None): the loop never iterated here, so do not continue.
+        # Returning False also avoids evaluating the user's condition against
+        # loop variables that don't exist yet (which would raise NameError).
+        if self._loop_table is None:
             return False
-        namespace = {**self.state, "table": self._loop_table}
-        exec("output = " + condition_expr, {}, namespace)
-        return namespace["output"]
+        return _eval_loop_expr(condition_expr, self.state, self._loop_table)
 
     @abstractmethod
     def condition(self) -> bool:
diff --git a/amber/src/test/python/core/models/test_loop_operators.py 
b/amber/src/test/python/core/models/test_loop_operators.py
index 35aa7a20cf..c2d52e6064 100644
--- a/amber/src/test/python/core/models/test_loop_operators.py
+++ b/amber/src/test/python/core/models/test_loop_operators.py
@@ -247,16 +247,17 @@ class TestLoopEndBase:
         op = _StubLoopEnd(condition_expr="i < len(table)")
         assert op.condition() is False
 
-    def test_consumed_flag_flips_after_run_update(self):
-        # Before any consume the loop hasn't run here; after run_update the
-        # real condition is evaluated against the consumed state.
+    def test_loop_table_set_after_run_update(self):
+        # _loop_table is None until a matching state is consumed (that None is
+        # what condition() short-circuits on); after run_update it holds the
+        # decoded table and the real condition is evaluated.
         op = _StubLoopEnd(update="i += 1", condition_expr="i < 3")
-        assert op._consumed_state is False
+        assert op._loop_table is None
         op.process_state(
             State({"i": 0, "table": table_to_ipc_bytes(Table([Tuple({"v": 
1})]))}),
             port=0,
         )
-        assert op._consumed_state is True
+        assert op._loop_table is not None
         assert op.condition() is True  # i became 1, 1 < 3
 
 

Reply via email to