This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
new 0b55650d09 refactor(loop): dedup loop-operator eval/state helpers,
drop redundant flag
0b55650d09 is described below
commit 0b55650d0986a13c8684ee0513194299b2e231b4
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sat Jun 13 15:37:11 2026 -0700
refactor(loop): dedup loop-operator eval/state helpers, drop redundant flag
A /simplify pass over the loop changes in operator.py:
- Extract `_strip_reserved(state)` -- the `{k: v ... if k not in
_RESERVED_STATE_KEYS}` comprehension was copy-pasted three times
(produce_state_on_finish + twice in run_update).
- Extract `_eval_loop_expr(expr, state, table)` -- LoopStart.eval_output and
LoopEnd.eval_condition were near-identical "exec the expression as
`output`
in a throwaway namespace" evaluators.
- Drop LoopEndOperator._consumed_state: it was redundant with _loop_table,
which is None exactly until run_update consumes a matching state.
condition()
now short-circuits on `self._loop_table is None`.
Behavior unchanged; test_loop_operators / test_main_loop /
test_output_manager
stay green (the one unit test that pinned the flag now pins the equivalent
_loop_table state). Net fewer lines and no duplicated logic.
---
amber/src/main/python/core/models/operator.py | 77 ++++++++++------------
.../test/python/core/models/test_loop_operators.py | 11 ++--
2 files changed, 41 insertions(+), 47 deletions(-)
diff --git a/amber/src/main/python/core/models/operator.py
b/amber/src/main/python/core/models/operator.py
index 5ec07e1164..153267aab2 100644
--- a/amber/src/main/python/core/models/operator.py
+++ b/amber/src/main/python/core/models/operator.py
@@ -314,6 +314,26 @@ class TableOperator(TupleOperatorV2):
_RESERVED_STATE_KEYS: frozenset = frozenset({"table", "output"})
+def _strip_reserved(state: State) -> State:
+ """Return ``state`` without the runtime-reserved keys (``table`` /
``output``)."""
+ return {
+ key: value for key, value in state.items() if key not in
_RESERVED_STATE_KEYS
+ }
+
+
+def _eval_loop_expr(expr: str, state: State, table: Optional[Table]):
+ """Evaluate ``expr`` as ``output`` against the loop variables plus
``table``.
+
+ Runs in a throwaway namespace seeded with the loop variables and ``table``
+ so those reserved names neither leak into nor are clobbered out of the
+ persistent loop ``state``. Shared by LoopStart's ``output`` expression and
+ LoopEnd's ``condition``.
+ """
+ namespace = {**state, "table": table}
+ exec("output = " + expr, {}, namespace)
+ return namespace["output"]
+
+
class LoopStartOperator(TableOperator):
"""Base class for the runtime side of a Loop Start operator.
@@ -372,14 +392,7 @@ class LoopStartOperator(TableOperator):
@overrides.final
def eval_output(self, output_expr: str, table: Table) -> TableLike:
- # Run the user's `output` expression in a throwaway namespace seeded
- # with the loop variables and the input `table`. This lets user code
- # read `table` and define `output` without those reserved names leaking
- # into -- or being silently clobbered out of -- the persistent loop
- # state (self.state), addressing the exec-namespace collision.
- namespace = {**self.state, "table": table}
- exec("output = " + output_expr, {}, namespace)
- return namespace["output"]
+ return _eval_loop_expr(output_expr, self.state, table)
@overrides.final
def produce_state_on_finish(self, port: int) -> State:
@@ -387,15 +400,9 @@ class LoopStartOperator(TableOperator):
# matching LoopEnd. The table rides as an Apache Arrow IPC stream, not
# pickle bytes: the receiving LoopEnd would otherwise have to
# `pickle.loads` data that lives in iceberg, a remote-code-execution
- # surface. `table`/`output` are runtime-reserved and are not kept in
- # self.state, so drop any stray ones before adding the real table.
- # Reads the buffer through `_buffered_table` so a rename of
+ # surface. Reads the buffer through `_buffered_table` so a rename of
# `TableOperator` doesn't silently break this.
- produced = {
- key: value
- for key, value in self.state.items()
- if key not in _RESERVED_STATE_KEYS
- }
+ produced = _strip_reserved(self.state)
produced["table"] = table_to_ipc_bytes(self._buffered_table(port))
return produced
@@ -452,12 +459,11 @@ class LoopEndOperator(TableOperator):
# that never consumed a matching state (an inner LoopEnd that only
# forwarded outer-loop pass-through state, or a loop that completed
# without a matching-branch consume). run_update is what populates
- # self.state / self._loop_table, so initialize them -- plus a
- # "consumed" flag eval_condition checks -- to avoid AttributeError and
- # to terminate cleanly in that case.
+ # self.state / self._loop_table, so initialize them here to avoid
+ # AttributeError; a None _loop_table means "nothing consumed yet" and
+ # condition() short-circuits to False (see eval_condition).
self.state: State = {}
self._loop_table: Optional[Table] = None
- self._consumed_state: bool = False
@overrides.final
def process_table(self, table: Table, port: int) ->
Iterator[Optional[TableLike]]:
@@ -470,37 +476,24 @@ class LoopEndOperator(TableOperator):
# user variables back into self.state. The table arrives as an Apache
# Arrow IPC stream (see LoopStartOperator.produce_state_on_finish), so
# it is decoded structurally rather than via pickle.loads -- no
- # remote-code-execution surface. `table`/`output` are runtime-reserved
- # and never persist, so user code cannot silently clobber loop
- # machinery through them. The real input table is kept on
+ # remote-code-execution surface. The real input table is kept on
# self._loop_table so condition() can read it after the update.
table = table_from_ipc_bytes(state["table"])
- namespace = {
- key: value
- for key, value in state.items()
- if key not in _RESERVED_STATE_KEYS
- }
+ namespace = _strip_reserved(state)
namespace["table"] = table
exec(update_code, {}, namespace)
self._loop_table = table
- self.state = {
- key: value
- for key, value in namespace.items()
- if key not in _RESERVED_STATE_KEYS
- }
- self._consumed_state = True
+ self.state = _strip_reserved(namespace)
@overrides.final
def eval_condition(self, condition_expr: str) -> bool:
- # No matching state was consumed (run_update never ran): the loop never
- # iterated here, so do not continue. Returning False also avoids
- # evaluating the user's condition against loop variables that don't
- # exist yet (which would raise NameError).
- if not self._consumed_state:
+ # No matching state was consumed (run_update never ran, so _loop_table
+ # is still None): the loop never iterated here, so do not continue.
+ # Returning False also avoids evaluating the user's condition against
+ # loop variables that don't exist yet (which would raise NameError).
+ if self._loop_table is None:
return False
- namespace = {**self.state, "table": self._loop_table}
- exec("output = " + condition_expr, {}, namespace)
- return namespace["output"]
+ return _eval_loop_expr(condition_expr, self.state, self._loop_table)
@abstractmethod
def condition(self) -> bool:
diff --git a/amber/src/test/python/core/models/test_loop_operators.py
b/amber/src/test/python/core/models/test_loop_operators.py
index 35aa7a20cf..c2d52e6064 100644
--- a/amber/src/test/python/core/models/test_loop_operators.py
+++ b/amber/src/test/python/core/models/test_loop_operators.py
@@ -247,16 +247,17 @@ class TestLoopEndBase:
op = _StubLoopEnd(condition_expr="i < len(table)")
assert op.condition() is False
- def test_consumed_flag_flips_after_run_update(self):
- # Before any consume the loop hasn't run here; after run_update the
- # real condition is evaluated against the consumed state.
+ def test_loop_table_set_after_run_update(self):
+ # _loop_table is None until a matching state is consumed (that None is
+ # what condition() short-circuits on); after run_update it holds the
+ # decoded table and the real condition is evaluated.
op = _StubLoopEnd(update="i += 1", condition_expr="i < 3")
- assert op._consumed_state is False
+ assert op._loop_table is None
op.process_state(
State({"i": 0, "table": table_to_ipc_bytes(Table([Tuple({"v":
1})]))}),
port=0,
)
- assert op._consumed_state is True
+ assert op._loop_table is not None
assert op.condition() is True # i became 1, 1 < 3