aglinxinyuan commented on code in PR #4206:
URL: https://github.com/apache/texera/pull/4206#discussion_r3407008439


##########
amber/src/main/python/core/models/operator.py:
##########
@@ -291,3 +291,177 @@ def process_table(self, table: Table, port: int) -> 
Iterator[Optional[TableLike]
             time, or None.
         """
         yield
+
+
+# Names the loop runtime owns inside the eval namespaces and across the loop
+# boundary; explicitly stripped from every state dict that crosses Loop
+# Start / Loop End so user code can neither read nor persist them.
+# Other reserved names that used to live in user state -- ``loop_counter``,
+# ``LoopStartId``, ``LoopStartStateURI`` -- are no longer in ``self.state``
+# at all; they ride the StateFrame envelope (see ``core.models.payload``)
+# and are stamped/captured by ``MainLoop._process_state_frame``.
+_RESERVED_STATE_KEYS: frozenset = frozenset({"table", "output"})
+
+
+class LoopStartOperator(TableOperator):
+    """Base class for the runtime side of a Loop Start operator.
+
+    The generator in ``LoopStartOpDesc.scala`` emits a thin
+    ``ProcessLoopStartOperator(LoopStartOperator)`` subclass that does
+    nothing more than wire the user-supplied ``initialization`` and
+    ``output`` expressions into ``open()`` and ``process_table()``; all
+    substantive logic lives here.
+
+    Lifecycle
+    ---------
+    * ``open()`` runs once when the worker starts. The generated subclass
+      executes the user's ``initialization`` against a fresh ``self.state``
+      dict; after it returns ``self.state`` holds *only* the user's loop
+      variables.
+    * ``process_state(state, port)`` (final) runs once when upstream sends
+      this LoopStart its input state; it merges that state into
+      ``self.state``. The nested pass-through branch and all
+      ``loop_counter`` bookkeeping live in
+      ``MainLoop._process_state_frame``, not here.
+    * ``process_table(table, port)`` is provided by the generated subclass
+      and yields a downstream row via ``eval_output(...)`` against the
+      user's ``output`` expression.
+    * ``produce_state_on_finish(port)`` (final) emits the state crossing
+      the boundary to the matching LoopEnd: user variables plus the
+      pickled input table.
+
+    Subclass contract
+    -----------------
+    The generated subclass overrides ``open()`` and ``process_table()``
+    only. All other methods are ``@overrides.final``; do not override
+    them. After ``open()`` returns, ``self.state`` must be a dict
+    containing the user's loop variables (none of the reserved names in
+    ``_RESERVED_STATE_KEYS``).
+
+    Reserved names
+    --------------
+    * ``loop_counter`` / ``LoopStartId`` / ``LoopStartStateURI`` -- live on
+      the StateFrame envelope (``core.models.payload``), not in
+      ``self.state``. Stamped by this operator's worker via
+      ``MainLoop._compute_loop_start_id``.
+    * ``table`` / ``output`` -- transient names only available inside the
+      ``eval_output`` throwaway namespace; never persisted in
+      ``self.state``. See ``_RESERVED_STATE_KEYS``.
+    """
+
+    @overrides.final
+    def process_state(self, state: State, port: int) -> Optional[State]:
+        # First-entry only: merge upstream state into self.state. The nested
+        # pass-through (state already carrying LoopStartStateURI) and all
+        # loop_counter bookkeeping are owned by the worker runtime
+        # (main_loop._process_state_frame), so this operator never sees the
+        # counter and never mutates the State it is handed.
+        self.state.update(state)
+        return None
+
+    @overrides.final
+    def eval_output(self, output_expr: str, table: Table) -> TableLike:
+        # Run the user's `output` expression in a throwaway namespace seeded
+        # with the loop variables and the input `table`. This lets user code
+        # read `table` and define `output` without those reserved names leaking
+        # into -- or being silently clobbered out of -- the persistent loop
+        # state (self.state), addressing the exec-namespace collision.
+        namespace = {**self.state, "table": table}
+        exec("output = " + output_expr, {}, namespace)
+        return namespace["output"]
+
+    @overrides.final
+    def produce_state_on_finish(self, port: int) -> State:
+        from pickle import dumps
+
+        # Emit the user's loop variables plus the pickled input table for the
+        # matching LoopEnd. `table`/`output` are runtime-reserved and are not
+        # kept in self.state, so drop any stray ones before adding the real
+        # pickled table.
+        produced = {
+            key: value
+            for key, value in self.state.items()
+            if key not in _RESERVED_STATE_KEYS
+        }
+        produced["table"] = dumps(Table(self._TableOperator__table_data[port]))

Review Comment:
   Re-applied in 620edeb4fc (lost in a force-rebase). Both halves:
   
   **1. Name-mangled access → accessor.** Added 
`TableOperator._buffered_table(port)`; inside the class `self.__table_data` 
resolves via normal name mangling, so a rename of `TableOperator` stays 
transparent. `LoopStartOperator.produce_state_on_finish` now goes through it 
instead of `self._TableOperator__table_data[port]`.
   
   **2. Pickle → Apache Arrow IPC.** Both pickle uses are in `operator.py` base 
helpers on this tip (`dumps` in `produce_state_on_finish`, `loads` in 
`run_update`), so it's a pure Python-side fix — no codegen change. New 
`table_to_ipc_bytes` / `table_from_ipc_bytes` in `table.py` (built on 
`pyarrow.ipc`); the producer encodes, the consumer decodes. Arrow IPC is 
length-prefixed and schema-typed with no callable payload, so the 
`pickle.loads`-on-iceberg-data RCE surface is gone. The wire shape (bytes in 
`state["table"]`) is unchanged — only the format.
   
   **Tests:** `TestBufferedTableAccessor` pins the accessor (incl. per-port 
keying); the produce-state test asserts the bytes parse as an Arrow IPC stream 
(stronger than a no-pickle-prefix check) and round-trip back to the same 
tuples; the matching-branch tests feed Arrow bytes. 14 loop-operator tests 
pass; ruff clean.
   
   (The unrelated `test_tuple.py::test_hash` OSError on my machine is a 
pre-existing Windows issue in `tuple.py` hashing, not from this change.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to