aglinxinyuan commented on code in PR #4206:
URL: https://github.com/apache/texera/pull/4206#discussion_r3407229088
##########
amber/src/main/python/core/models/operator.py:
##########
@@ -291,3 +291,177 @@ def process_table(self, table: Table, port: int) ->
Iterator[Optional[TableLike]
time, or None.
"""
yield
+
+
+# Names the loop runtime owns inside the eval namespaces and across the loop
+# boundary; explicitly stripped from every state dict that crosses Loop
+# Start / Loop End so user code can neither read nor persist them.
+# Other reserved names that used to live in user state -- ``loop_counter``,
+# ``LoopStartId``, ``LoopStartStateURI`` -- are no longer in ``self.state``
+# at all; they ride the StateFrame envelope (see ``core.models.payload``)
+# and are stamped/captured by ``MainLoop._process_state_frame``.
+_RESERVED_STATE_KEYS: frozenset = frozenset({"table", "output"})
+
+
+class LoopStartOperator(TableOperator):
+ """Base class for the runtime side of a Loop Start operator.
+
+ The generator in ``LoopStartOpDesc.scala`` emits a thin
+ ``ProcessLoopStartOperator(LoopStartOperator)`` subclass that does
+ nothing more than wire the user-supplied ``initialization`` and
+ ``output`` expressions into ``open()`` and ``process_table()``; all
+ substantive logic lives here.
+
+ Lifecycle
+ ---------
+ * ``open()`` runs once when the worker starts. The generated subclass
+ executes the user's ``initialization`` against a fresh ``self.state``
+ dict; after it returns ``self.state`` holds *only* the user's loop
+ variables.
+ * ``process_state(state, port)`` (final) runs once when upstream sends
+ this LoopStart its input state; it merges that state into
+ ``self.state``. The nested pass-through branch and all
+ ``loop_counter`` bookkeeping live in
+ ``MainLoop._process_state_frame``, not here.
+ * ``process_table(table, port)`` is provided by the generated subclass
+ and yields a downstream row via ``eval_output(...)`` against the
+ user's ``output`` expression.
+ * ``produce_state_on_finish(port)`` (final) emits the state crossing
+ the boundary to the matching LoopEnd: user variables plus the
+ pickled input table.
+
+ Subclass contract
+ -----------------
+ The generated subclass overrides ``open()`` and ``process_table()``
+ only. All other methods are ``@overrides.final``; do not override
+ them. After ``open()`` returns, ``self.state`` must be a dict
+ containing the user's loop variables (none of the reserved names in
+ ``_RESERVED_STATE_KEYS``).
+
+ Reserved names
+ --------------
+ * ``loop_counter`` / ``LoopStartId`` / ``LoopStartStateURI`` -- live on
+ the StateFrame envelope (``core.models.payload``), not in
+ ``self.state``. Stamped by this operator's worker via
+ ``MainLoop._compute_loop_start_id``.
+ * ``table`` / ``output`` -- transient names only available inside the
+ ``eval_output`` throwaway namespace; never persisted in
+ ``self.state``. See ``_RESERVED_STATE_KEYS``.
+ """
+
+ @overrides.final
+ def process_state(self, state: State, port: int) -> Optional[State]:
+ # First-entry only: merge upstream state into self.state. The nested
+ # pass-through (state already carrying LoopStartStateURI) and all
+ # loop_counter bookkeeping are owned by the worker runtime
+ # (main_loop._process_state_frame), so this operator never sees the
+ # counter and never mutates the State it is handed.
+ self.state.update(state)
+ return None
+
+ @overrides.final
+ def eval_output(self, output_expr: str, table: Table) -> TableLike:
+ # Run the user's `output` expression in a throwaway namespace seeded
+ # with the loop variables and the input `table`. This lets user code
+ # read `table` and define `output` without those reserved names leaking
+ # into -- or being silently clobbered out of -- the persistent loop
+ # state (self.state), addressing the exec-namespace collision.
+ namespace = {**self.state, "table": table}
+ exec("output = " + output_expr, {}, namespace)
+ return namespace["output"]
+
+ @overrides.final
+ def produce_state_on_finish(self, port: int) -> State:
+ from pickle import dumps
+
+ # Emit the user's loop variables plus the pickled input table for the
+ # matching LoopEnd. `table`/`output` are runtime-reserved and are not
+ # kept in self.state, so drop any stray ones before adding the real
+ # pickled table.
+ produced = {
+ key: value
+ for key, value in self.state.items()
+ if key not in _RESERVED_STATE_KEYS
+ }
+ produced["table"] = dumps(Table(self._TableOperator__table_data[port]))
+ return produced
+
+
+class LoopEndOperator(TableOperator):
+ """Base class for the runtime side of a Loop End operator.
+
+ The generator in ``LoopEndOpDesc.scala`` emits a thin
+ ``ProcessLoopEndOperator(LoopEndOperator)`` subclass that wires the
+ user-supplied ``update`` expression into ``process_state(...)`` (via
+ ``run_update``) and the ``condition`` expression into ``condition()``
+ (via ``eval_condition``); all substantive logic lives here.
+
+ Lifecycle
+ ---------
+ * ``process_table(table, port)`` (final) yields each input table
+ through as-is.
+ * ``process_state(state, port)`` is provided by the generated
+ subclass. It calls ``run_update(update_code, state)`` to unpickle
+ the input table, run the user's ``update`` in a throwaway
+ namespace, stash the table on ``self._loop_table``, and persist
+ only user variables back into ``self.state``. Returns ``None``.
+ * ``condition()`` is the abstract method the generated subclass
+ implements by delegating to ``eval_condition(...)`` against the
+ user's ``condition`` expression. Called by ``MainLoop.complete()``
+ to decide whether to fire the back-edge via
+ ``_jump_to_loop_start``.
+
+ Subclass contract
+ -----------------
+ The generated subclass overrides ``process_state()`` (delegating to
+ ``run_update``) and ``condition()`` (delegating to
+ ``eval_condition``). All other methods are ``@overrides.final``; do
+ not override them.
+
+ Reserved names
+ --------------
+ Same as ``LoopStartOperator``: ``loop_counter`` / ``LoopStartId`` /
+ ``LoopStartStateURI`` live on the StateFrame envelope (never in user
+ state); ``table`` / ``output`` are transient names available only
+ inside ``run_update`` / ``eval_condition``'s throwaway namespace and
+ are stripped from ``self.state``. See ``_RESERVED_STATE_KEYS``.
+ """
+
+ @overrides.final
+ def process_table(self, table: Table, port: int) ->
Iterator[Optional[TableLike]]:
+ yield table
+
+ @overrides.final
+ def run_update(self, update_code: str, state: State) -> None:
+ # Run the user's `update` in a throwaway namespace seeded with the
+ # incoming loop variables and the unpickled input table, then persist
+ # only the user variables back into self.state. `table`/`output` are
+ # runtime-reserved and never persist, so user code cannot silently
+ # clobber loop machinery through them. The real input table is kept on
+ # self._loop_table so condition() can read it after the update.
+ from pickle import loads
+
+ table = loads(state["table"])
+ namespace = {
+ key: value
+ for key, value in state.items()
+ if key not in _RESERVED_STATE_KEYS
+ }
+ namespace["table"] = table
+ exec(update_code, {}, namespace)
+ self._loop_table = table
+ self.state = {
+ key: value
+ for key, value in namespace.items()
+ if key not in _RESERVED_STATE_KEYS
+ }
+
+ @overrides.final
+ def eval_condition(self, condition_expr: str) -> bool:
Review Comment:
Confirmed reachable, and fixed in 0b01d01261 (initialized in `__init__`, per
your suggestion, plus a guard).
`MainLoop.complete()` calls `condition()` on every LoopEnd, and
`eval_condition` reads `self.state` / `self._loop_table`, which only
`run_update` assigns. So a LoopEnd that finishes without consuming a matching
state — an inner LoopEnd that only forwarded outer-loop pass-through state
(`loop_counter > 0` is handled and returned in `_process_state_frame` before
the operator runs), or a loop with no matching-branch consume — would raise
`AttributeError`. The `_StubLoopEnd` test stub was hiding this by pre-seeding
`self.state = {}` in its own `__init__`; the generated `ProcessLoopEndOperator`
has no `__init__`/`open`, so it had no such default.
Fix:
- `LoopEndOperator.__init__` now initializes `self.state = {}`,
`self._loop_table = None`, and `self._consumed_state = False`. The generated
operator inherits it.
- `run_update` sets `self._consumed_state = True` after the consume.
- `eval_condition` returns `False` when nothing has been consumed — the loop
never iterated at this LoopEnd, so it must not fire the back-edge. Bare field
init alone wasn't enough: `eval_condition` would otherwise `exec` the user's
condition (e.g. `i < len(table)`) against an empty namespace and raise
`NameError` on the undefined loop variable.
Tests: unmasked `_StubLoopEnd` (dropped its `self.state = {}` so it mirrors
the generated operator and exercises the base `__init__`); added
`test_condition_returns_false_before_any_state_is_consumed` (your exact
scenario — `condition()` with no prior consume returns `False`, no raise) and
`test_consumed_flag_flips_after_run_update`. 16 loop-operator + 33
main_loop/output_manager tests pass; no behavior change on the normal consume
path.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]