aglinxinyuan commented on PR #5700: URL: https://github.com/apache/texera/pull/5700#issuecomment-4700160611
## Review history (migrated from #4206) This PR was re-opened from a fork (see the description). The code-review discussion happened on the original PR #4206 — reproduced below with the **original author and date** so it isn't lost. The canonical threads (with full inline diff context) remain on [#4206](https://github.com/apache/texera/pull/4206). _Auto-generated bot comments (Codecov coverage, CI summaries) are omitted — they regenerate on this PR._ <details> <summary><b>Conversation comments (8)</b></summary> **@Xiao-zhen-Liu** — 2026-05-19: > @aglinxinyuan Can you upload the workflows used for testing in the PR description? **@aglinxinyuan** — 2026-05-19: > > @aglinxinyuan Can you upload the workflows used for testing in the PR description? > > Updated. **@aglinxinyuan** — 2026-05-19: > I plan to add test cases on a separate PR. What do you think? **@Xiao-zhen-Liu** — 2026-05-20: > > I plan to add test cases on a separate PR. What do you think? > > I think it makes more sense to include test cases in this PR. Usually test cases are not good candidates for splitting into a future PR. **@chenlica** — 2026-05-20: > > > I plan to add test cases on a separate PR. What do you think? > > > > I think it makes more sense to include test cases in this PR. Usually test cases are not good candidates for splitting into a future PR. > > Agreed. **@aglinxinyuan** — 2026-05-20: > Sure, I’ll add test cases in this PR. > > For reference, before adding the test cases, this PR already contains 302 lines of changes across 12 files. **@aglinxinyuan** — 2026-06-09: > @Xiao-zhen-Liu, please review the PR again. **@aglinxinyuan** — 2026-06-13: > Superseded by #5700, which re-opens this from my fork (aglinxinyuan/texera) to satisfy the requirement that contributions come from a fork rather than a branch on the main repo. The code is identical and the labels/CI carry over. Continuing on #5700 — the full review discussion here remains for reference. Thanks! </details> <details> <summary><b>Inline review threads — 38 threads, 67 comments</b></summary> --- **`amber/src/main/python/core/architecture/packaging/output_manager.py` L228** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3262435610) **@Copilot** — 2026-05-18: > The method name `reset_storage` is too generic for a behavior that only makes sense for LoopEnd operators (it truncates this worker's result and state iceberg tables). The block comment at the constructor even calls it `reset_loopend_storage`, matching the PR description. Renaming the public method to `reset_loopend_storage` (or similar) would make call sites self-documenting and avoid suggesting general-purpose use. --- **`amber/src/main/python/core/architecture/packaging/output_manager.py` L228** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3285892254) **@Xiao-zhen-Liu** — 2026-05-22: > `reset_storage` has no description. The name doesn't say what the method actually does, which is to delete and recreate two iceberg tables. It lives on `OutputManager`, a general class, but the only caller is a single branch in `main_loop.py:237` — neither the method's name nor its location hint at that. It assumes `set_up_port_storage_writer` was called first and that the operator has exactly one output port; neither is checked. It has no tests. > > The PR description ("Truncate LoopEnd's iceberg tables at each iteration boundary") makes it sound like a class-wide property of LoopEnd. The method is actually called only from one runtime path; in a nested loop with multiple LoopEnds, not every LoopEnd resets on every invocation. > > The reason this is correct — downstream readers are paused because the output mode is MATERIALIZED, so they only read after the loop finishes — lives only in the PR description. **@aglinxinyuan** — 2026-05-27: > Addressed in e6bea518f2. (The method is now `reset_output_storage` after an earlier rename, and on the current branch it recreates just the one output result table — the state table is handled separately in `save_state_to_storage_if_needed`.) > > * **Docstring**: it now says what the method does (drop + recreate the single output table, bracketed by closing the old writer and opening a fresh one), that it is called only by a Loop End worker once per iteration, and — the reasoning that previously lived only in the PR description — *why* truncating live storage is safe: a loop runs in MATERIALIZED mode, so downstream operators don't read the table until the loop has finished, so no reader observes the intermediate truncation. > * **Preconditions checked**: the two previously-implicit assumptions now raise a clear `RuntimeError` instead of silently resetting the wrong port / raising a bare `KeyError` — (1) exactly one output port, (2) `set_up_port_storage_writer` already ran for it. > * **Tests**: new `test_output_manager.py` covers the happy path (recreate bracketed by close→reopen) and both guard failures, with the iceberg/thread collaborators mocked so it stays hermetic. > > On the location/naming: it stays on `OutputManager` because it operates entirely on that class's private writer/URI state; the docstring now makes the single-caller, Loop-End-only usage explicit so the general location doesn't mislead. --- **`amber/src/main/python/core/architecture/packaging/output_manager.py` L237** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3400851469) **@Xiao-zhen-Liu** — 2026-06-12: > **Resolved thread 17, but the fix isn't in the branch.** The reply described a rename to `reset_output_storage`, a docstring, and guards (commit `e6bea518f2`), but the method here is still `reset_storage`, undocumented, with no checks. The reason truncation is safe (downstream is paused while in MATERIALIZED mode) lives only in the PR description. Please re-open, or point me at the commit if it was lost. **@aglinxinyuan** — 2026-06-12: > You're right — that earlier work was lost when the branch was rebased/squashed. Re-applied on the current branch in 2095b592ee. > > * **Renamed** `reset_storage` → `reset_output_storage` (and updated the caller in `main_loop.py` plus the two `__init__` / `set_up_port_storage_writer` comments that already referenced the intended name). > * **Docstring** now states what it does (drop + recreate the result AND state tables, then reopen the writers), that it's called only by a Loop End worker once per iteration, and — the part that previously lived only in the PR description — **why truncating live storage is safe**: a loop forces MATERIALIZED execution mode, so downstream operators don't begin reading this output until the loop region has fully completed; no reader can observe an intermediate truncation. > * **Guards**: the two previously-implicit preconditions now raise a clear `RuntimeError` instead of silently resetting the wrong port or dereferencing `None` — (1) exactly one output port, (2) `set_up_port_storage_writer` already ran (`_storage_uri_base` populated). > * **Tests**: new `TestResetOutputStorage` in `test_output_manager.py` covers the happy path (close → recreate result+state docs → reopen writer) and both guard failures, with the iceberg/thread collaborators mocked. > > 9/9 `test_output_manager.py` and 24/24 `test_main_loop.py` tests green. --- **`amber/src/main/python/core/architecture/packaging/output_manager.py` L239** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3262435585) **@Copilot** — 2026-05-18: > `reset_storage` blindly dereferences `self._storage_uri_base` and `self.get_port_ids()[0]`. If it is ever invoked before `set_up_port_storage_writer` has run (e.g., a LoopEnd that received state without yet having its output writer provisioned, or an operator with zero output ports), this will raise `TypeError`/`IndexError` with no context. Add a guard (or assertion with a descriptive error) and consider asserting that the executor really is a LoopEnd at the call site so this state-mutation is scoped to where it is meaningful. --- **`amber/src/main/python/core/architecture/packaging/output_manager.py` L260** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3400851478) **@Xiao-zhen-Liu** — 2026-06-12: > **Possible data loss.** This recreates both the result and state tables every iteration (`override_if_exists=True`), but `RegionExecutionCoordinator.scala:579-589` deliberately does *not* recreate LoopEnd's documents on a re-run — its comment says recreating them "would erase what we just wrote." The two paths look contradictory; how is the accumulated output not erased? (Also: the description says this only runs for the inner LoopEnd of a nested loop, but `main_loop.py:267` calls it for any LoopEnd with output state.) **@aglinxinyuan** — 2026-06-13: > Good catch — this is a real latent bug, deeper than the apparent contradiction. Fixed in e61681d695. > > The two paths don't actually conflict at runtime, because **the Python reset never fires**: a Loop End's generated `process_state` returns `None` (and `produce_state_on_finish` isn't overridden, so it's `None` too), so `output_state` is always `None` for a Loop End — and `reset_output_storage()` sat under `if output_state is not None:`. On top of that it was hooked in `process_input_state` (the consume path, `loop_counter == 0`), not the outer pass-through (`loop_counter > 0`) where it belongs. So your parenthetical was sharper than it looked: not only is the description's "inner Loop End only" claim not matched by the code — the call was effectively dead for *every* Loop End. > > So today every Loop End just **accumulates**, which is actually correct for a single / outermost loop (the scheduler's `reusesOutputStorageOnReExecution` keeps the doc across re-runs and the writer appends — that's the `RegionExecutionCoordinator` "would erase what we just wrote" path, and it's right). The gap is the **nested** case: an inner Loop End should accumulate only within the current outer iteration and reset when the outer loop advances. With the reset dead, the inner Loop End accumulated across all outer iterations (9 rows in the 3×3 case instead of 3). > > Also — my earlier description edit (the one that prompted this) was wrong: I'd "corrected" the description toward the buggy code. The *original* wording (inner Loop End of a nested loop, `loop_counter > 0` pass-through) described the intent correctly, and I've restored it. > > **Fix:** > - Move `reset_output_storage()` to the inner-Loop-End pass-through branch in `_process_state_frame` (`loop_counter > 0`). The input reader replays all states before any data each region execution, so the tables still hold the *previous* outer iteration's rows when the outer boundary state passes through — clearing there makes each outer iteration accumulate from empty. > - It fires exactly **once per outer iteration**: each loop operator is its own region, so the inner Loop Start's region doesn't carry `reusesOutputStorageOnReExecution` and its output is recreated on every inner back-edge — the outer pass-through therefore only reaches the inner Loop End on the first inner iteration of each outer iteration. A single / outermost Loop End never sees `loop_counter > 0`, so it never resets. > - Removed the dead consume-path call; corrected the `reset_output_storage` docstring, the call-site comment, and the `RegionExecutionCoordinator` comment. Scala side unchanged — it provides the base per-loop accumulation this reset carves the nested exception out of. > > **Tests:** > - Unit (`test_main_loop`): the inner pass-through triggers `reset_output_storage` once and doesn't invoke the operator; the consume path (single loop) and a Loop Start pass-through never reset. > - Integration (`LoopIntegrationSpec`): assert the **materialized** result row counts — single loop = 3 (accumulate), nested inner Loop End = 3 (not 9), nested outer Loop End = 9. The pre-existing cumulative output-tuple counts can't distinguish accumulate from reset, which is why this slipped through. Verified locally via the unit tests + ruff; the materialized integration assertions run in the `amber-integration` CI job. --- **`amber/src/main/python/core/models/operator.py` L300** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3262435497) **@Copilot** — 2026-05-18: > `LoopStartOperator.process_state` does `state["loop_counter"] += 1` when the incoming state is from an outer scope. This silently assumes the incoming state already contains a `loop_counter` key, which is only true if the outer scope was also produced by a `LoopStart` whose `open()` initialized it. Any non-loop upstream that supplies a state carrying `LoopStartStateURI` (or an outer LoopStart whose user `initialization` happens not to define `loop_counter`) will raise `KeyError` at runtime. Either default-initialize via `state.get("loop_counter", 0) + 1` or assert the precondition with a clear error. --- **`amber/src/main/python/core/models/operator.py` L300** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3285892258) **@Xiao-zhen-Liu** — 2026-05-22: > On the nested-loop branch, this mutates the dict it was passed (`state["loop_counter"] += 1`) and returns it. The runtime, not this operator, owns that dict. **@aglinxinyuan** — 2026-06-01: > Addressed by moving `loop_counter` out of the `State` content dict entirely (latest: 63d243353). The loop operators never read or mutate it. It rides on the `StateFrame` transport envelope and the worker runtime owns it: `main_loop._process_state_frame` applies the `+1`/`-1` and handles the LoopStart/LoopEnd nested pass-through before the operator runs (so the generated LoopEnd is now consume-only). It is materialized/serialized as its own `loop_counter` column parallel to `content`: `State.SCHEMA` is the two-column schema and `State.to_tuple(loop_counter)` writes both columns, while `from_tuple` returns the bare State (the readers that need the counter read the column directly). The user state JSON stays clean. Operator-level counter coverage was relocated to `main_loop` runtime tests. --- **`amber/src/main/python/core/models/operator.py` L309** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3285892259) **@Xiao-zhen-Liu** — 2026-05-22: > Two issues on this line: > - `self._TableOperator__table_data[port]` reads a parent class's private field by writing out its name-mangled form. This depends on the parent being named exactly `TableOperator`; renaming the parent silently breaks this. > - The table is pickled to bytes, then stored inside a state dict that is serialized as a JSON string (the `State` schema is `{CONTENT: STRING}`). The table makes a `pickle → bytes → JSON-string → iceberg` trip every iteration. `pickle.loads` of data anyone can write to is a remote-code-execution surface. **@aglinxinyuan** — 2026-06-04: > Both issues fixed in e281c61b4c. > > **1. Name-mangled access.** Added a protected `TableOperator._buffered_table(port)` accessor; inside the class `self.__table_data` resolves normally so a rename of `TableOperator` stays transparent. `LoopStartOperator.produce_state_on_finish` now goes through it instead of `self._TableOperator__table_data[port]`. > > **2. Pickle as RCE surface.** Swapped the bytes format from pickle to Apache Arrow IPC — structured + typed, no callable payload, parse errors raise at read time. Two new helpers in `core/models/table.py`: > > * `table_to_ipc_bytes(table) -> bytes` (sender side, used by Loop Start) > * `table_from_ipc_bytes(buf) -> Table` (receiver side, emitted by the codegen for Loop End) > > The codegen in `LoopEndOpDesc.scala` now emits `from core.models.table import table_from_ipc_bytes; self.state["table"] = table_from_ipc_bytes(self.state["table"])` in place of the prior `from pickle import loads` lines. The wire shape (bytes-in-`state["table"]`) is unchanged; only the format swaps. > > **Tests:** > * New `core/models/test_loop_operators.py` (8 tests, all green): pins the accessor, the Arrow IPC round-trip across mixed/single-row/empty tables, that the serialized bytes parse as an Arrow IPC stream (stronger than a pickle-prefix check), that malformed input raises at parse time, and the end-to-end Loop Start sender path. > * Extended `LoopOpDescsSpec`: asserts the generated Loop End source imports `table_from_ipc_bytes` and contains no `pickle` reference at all. > > Diff scoped to 5 files (operator.py, table.py, test_loop_operators.py, LoopEndOpDesc.scala, LoopOpDescsSpec.scala) — no unrelated churn. The URI-in-state alternative was considered but rejected as ~4× the diff with new cleanup plumbing; the surgical pickle→Arrow swap fully resolves both concerns. --- **`amber/src/main/python/core/models/operator.py` L310** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3262435541) **@Copilot** — 2026-05-18: > Reaching into `self._TableOperator__table_data` from a subclass relies on Python's private name-mangling and tightly couples `LoopStartOperator` to the internal storage detail of `TableOperator`. Any rename of `TableOperator.__table_data` (a private attribute, so legitimately renameable without notice) silently breaks loops. Consider exposing a protected accessor on `TableOperator` (e.g., `_get_table_data(port)`) and using it here, or store the pickled table inside `process_table` instead. --- **`amber/src/main/python/core/models/operator.py` L386** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3400851484) **@Xiao-zhen-Liu** — 2026-06-12: > **Resolved thread 19, but the fix isn't in the branch.** The reply described a `TableOperator._buffered_table(port)` accessor and an Arrow replacement for pickle (commit `e281c61b4c`); neither is here. This still reads the parent's private field through the mangled name `self._TableOperator__table_data[port]` (which breaks silently if `TableOperator` is renamed), and the table is still moved with `pickle` (`loads` at line 444) — the remote-code-execution risk from the first review. Please re-open both. **@aglinxinyuan** — 2026-06-13: > Re-applied in 620edeb4fc (lost in a force-rebase). Both halves: > > **1. Name-mangled access → accessor.** Added `TableOperator._buffered_table(port)`; inside the class `self.__table_data` resolves via normal name mangling, so a rename of `TableOperator` stays transparent. `LoopStartOperator.produce_state_on_finish` now goes through it instead of `self._TableOperator__table_data[port]`. > > **2. Pickle → Apache Arrow IPC.** Both pickle uses are in `operator.py` base helpers on this tip (`dumps` in `produce_state_on_finish`, `loads` in `run_update`), so it's a pure Python-side fix — no codegen change. New `table_to_ipc_bytes` / `table_from_ipc_bytes` in `table.py` (built on `pyarrow.ipc`); the producer encodes, the consumer decodes. Arrow IPC is length-prefixed and schema-typed with no callable payload, so the `pickle.loads`-on-iceberg-data RCE surface is gone. The wire shape (bytes in `state["table"]`) is unchanged — only the format. > > **Tests:** `TestBufferedTableAccessor` pins the accessor (incl. per-port keying); the produce-state test asserts the bytes parse as an Arrow IPC stream (stronger than a no-pickle-prefix check) and round-trip back to the same tuples; the matching-branch tests feed Arrow bytes. 14 loop-operator tests pass; ruff clean. > > (The unrelated `test_tuple.py::test_hash` OSError on my machine is a pre-existing Windows issue in `tuple.py` hashing, not from this change.) --- **`amber/src/main/python/core/models/operator.py` L442** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3400851487) **@Xiao-zhen-Liu** — 2026-06-12: > `eval_condition` reads `self._loop_table` and `self.state`, which are only created in `run_update`. `complete()` calls `condition()` for any LoopEnd. If a LoopEnd finishes without having consumed state (empty input, or an inner LoopEnd that only passed outer-loop state through), this raises `AttributeError`. Please confirm that can't happen, or initialize both in `__init__`. **@aglinxinyuan** — 2026-06-13: > Confirmed reachable, and fixed in 0b01d01261 (initialized in `__init__`, per your suggestion, plus a guard). > > `MainLoop.complete()` calls `condition()` on every LoopEnd, and `eval_condition` reads `self.state` / `self._loop_table`, which only `run_update` assigns. So a LoopEnd that finishes without consuming a matching state — an inner LoopEnd that only forwarded outer-loop pass-through state (`loop_counter > 0` is handled and returned in `_process_state_frame` before the operator runs), or a loop with no matching-branch consume — would raise `AttributeError`. The `_StubLoopEnd` test stub was hiding this by pre-seeding `self.state = {}` in its own `__init__`; the generated `ProcessLoopEndOperator` has no `__init__`/`open`, so it had no such default. > > Fix: > - `LoopEndOperator.__init__` now initializes `self.state = {}`, `self._loop_table = None`, and `self._consumed_state = False`. The generated operator inherits it. > - `run_update` sets `self._consumed_state = True` after the consume. > - `eval_condition` returns `False` when nothing has been consumed — the loop never iterated at this LoopEnd, so it must not fire the back-edge. Bare field init alone wasn't enough: `eval_condition` would otherwise `exec` the user's condition (e.g. `i < len(table)`) against an empty namespace and raise `NameError` on the undefined loop variable. > > Tests: unmasked `_StubLoopEnd` (dropped its `self.state = {}` so it mirrors the generated operator and exercises the base `__init__`); added `test_condition_returns_false_before_any_state_is_consumed` (your exact scenario — `condition()` with no prior consume returns `False`, no raise) and `test_consumed_flag_flips_after_run_update`. 16 loop-operator + 33 main_loop/output_manager tests pass; no behavior change on the normal consume path. --- **`amber/src/main/python/core/models/operator.py` L452** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3285892263) **@Xiao-zhen-Liu** — 2026-05-22: > `LoopEndOperator` doesn't itself declare `process_state`, but the generator overrides `process_state` anyway. To understand what either loop operator does, you have to read three files together: the Scala generator template, this base class, and its parents. The rules that tie them together — which method runs when, which keys are reserved (`loop_counter`, `table`, `output`, `LoopStartId`, `LoopStartStateURI`), what `self.state` must contain by the time `open()` returns — are not encoded anywhere as code; they're string conventions shared across files. **@aglinxinyuan** — 2026-06-04: > Closed in 873bd33d87. Most of the substance had already landed; this commit adds the discoverability layer. > > | Concern | Status | > |---|---| > | Generator overrides `process_state` opaquely | **Fixed in 411d92f67** — the LoopStart/End generator templates collapsed to thin delegates. `LoopStart` does `yield self.eval_output($output, table)`; `LoopEnd` does `self.run_update($update, state)` and `return self.eval_condition($condition)`. All substantive logic lives in the Python base classes (`eval_output`, `run_update`, `eval_condition`). | > | Reserved names as string conventions | **Mostly encoded** in prior commits — `loop_counter` / `LoopStartId` / `LoopStartStateURI` are typed fields on `StateFrame` (`core/models/payload.py`), not string keys in user state. `table` / `output` were filtered out of `self.state` by hard-coded logic in each helper. | > | Logic split across 3 files | **Generator already collapsed**; `LoopStartOpDescSpec` / `LoopEndOpDescSpec` pin that the emitted code uses only the base helpers (`code should include("self.eval_output(")` etc.) and contains no `loop_counter` logic. | > | "Which method runs when" / "what `self.state` must contain after `open()` returns" — not encoded anywhere as code-level prose | **Closed here** — class-level docstrings on `LoopStartOperator` and `LoopEndOperator` now document the lifecycle, subclass contract, and reserved-name space inline with the code. Discoverable via `help(LoopStartOperator)`. | > | Reserved-key set had no single discoverable source of truth | **Closed here** — new `_RESERVED_STATE_KEYS = frozenset({"table", "output"})` constant; the three filter sites in `eval_output` / `run_update` / `produce_state_on_finish` now read against this single source. `TestReservedStateKeysConstant` pins the set's contents (and that envelope-only names like `loop_counter` are NOT in it). | > > 12/12 tests in `test_loop_operators.py` green (3 new + 9 existing). Diff scoped to `operator.py` (docstrings + constant + helper rewrites) and `test_loop_operators.py` (one new test class). --- **`amber/src/main/python/core/runnables/main_loop.py` L100** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3262435450) **@Copilot** — 2026-05-18: > Deriving the LoopStart operator id from `worker_id` by string-splitting on `-` and `-main-0` is brittle: it silently assumes the worker name ends with `-main-0` and that the operator id itself never contains the literal `-main-0`. `SpecialPhysicalOpFactory` and other code paths already produce layer names containing underscores/hyphens, so a future renaming of the layer suffix or worker index will break this without any error. Prefer using the operator identity already available from the worker's context (e.g., the parsed `ActorVirtualIdentity` / physical-op id) rather than re-parsing the worker id string. --- **`amber/src/main/python/core/runnables/main_loop.py` L100** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3285892239) **@Xiao-zhen-Liu** — 2026-05-22: > `worker_id.split("-", 1)[1].rsplit("-main-0", 1)[0]` recovers the operator ID by chopping up the worker-name string. This depends on the exact worker-naming format. If the format ever changes — an extra dash, a different suffix, more than one worker per operator — the wrong ID is extracted silently, and the back-jump goes to the wrong operator with no error. **@aglinxinyuan** — 2026-06-01: > Good catch. Replaced the `-main-0` string-chop with a new `get_operator_id` helper in `core/util/virtual_identity.py` that parses the worker name with the shared `worker_name_pattern` — the same regex the engine already uses for `get_worker_index`, mirroring Scala `VirtualIdentityUtils.getPhysicalOpId`. It no longer assumes the layer name or worker index, correctly handles operator ids that contain dashes, and raises `ValueError` on an unrecognized worker id so a future naming change fails loudly instead of silently extracting the wrong id. Added `TestGetOperatorId` covering the dashed-op-id, non-`main`-layer / nonzero-index, and fail-loud cases. Fixed in 512841a78. --- **`amber/src/main/python/core/runnables/main_loop.py` L107** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3285892245) **@Xiao-zhen-Liu** — 2026-05-22: > `next(iter(reader_runnables.values()))[0].uri` picks whichever input port happens to be first, with no check that there is only one. If LoopStart ever has more than one input, this silently picks one. **@aglinxinyuan** — 2026-05-22: > The design of LoopStart is fixed; it has only one input. --- **`amber/src/main/python/core/runnables/main_loop.py` L108** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3285892237) **@Xiao-zhen-Liu** — 2026-05-22: > This writes `LoopStartStateURI` — a storage path — into the state object that flows through every operator in the loop body, including user UDFs. The URI is internal runtime data; user code shouldn't see it or be able to write to it. LoopEnd already has the operator ID, so the URI doesn't need to travel through state. **@aglinxinyuan** — 2026-06-04: > The substantive concern is closed by prior work; this commit (a374596844) adds the defensive test. > > **1. URI is no longer in user state.** Commit `30ba48c39f` moved `loop_counter` / `LoopStartId` / `LoopStartStateURI` onto the typed `StateFrame` envelope (`core/models/payload.py`). The user-facing operator's `process_state(state, port)` callback only ever receives `frame.frame` (the inner `State` dict) — the envelope's scalar fields are siblings, not keys inside it. Grep returns zero `state["LoopStartStateURI"]` / `output_state["LoopStartStateURI"]` accesses anywhere; the runtime captures the URI into `MainLoop._loop_start_state_uri` (`main_loop.py:374-375`) and reads it from there in `_jump_to_loop_start` (`main_loop.py:138`). > > **2. New test pins the property end to end.** Added `test_user_state_excludes_envelope_metadata_on_consume_branch` in `test_main_loop.py`: builds a `StateFrame` with envelope metadata, drives it through `_process_state_frame` on the consume branch, and asserts (a) the runtime captured the envelope onto its instance fields, but (b) the operator-facing `state_processing_manager.current_input_state` carries only the inner State's keys — no `LoopStartId` / `LoopStartStateURI` / `loop_counter` as string keys. If a future refactor accidentally merges envelope fields into the inner state, the test breaks. > > **3. On the secondary hint** — *"LoopEnd already has the operator ID, so the URI doesn't need to travel through state"*: technically true but requires either a new controller RPC (LoopEnd asks the controller for LoopStart's input URI given the op id) or a runtime URI registry / canonical naming scheme. The envelope already separates the URI from user-visible state, so leaving the envelope field as-is keeps the back-edge write path simple. Happy to pursue removing the envelope field too if you'd prefer that direction — let me know. > > 24/24 `test_main_loop.py` tests green; diff scoped to the one test file. --- **`amber/src/main/python/core/runnables/main_loop.py` L122** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3285892249) **@Xiao-zhen-Liu** — 2026-05-22: > `_attach_loop_start_id` and `_jump_to_loop_start` are the most fragile new methods — worker-name parsing, first-port assumption, state-key stripping, direct iceberg write — and they have no tests. **@aglinxinyuan** — 2026-06-04: > Most of the bullets here are already addressed by intermediate commits; this commit (f22738ecb6) closes the remaining gaps. > > | Concern | Status | > |---|---| > | Worker-name parsing (`split("-",1)[1].rsplit("-main-0",1)[0]`) | **Fixed in 512841a78b** — `_compute_loop_start_id` now delegates to `get_operator_id(...)` in `core/util/virtual_identity.py`, which is exhaustively tested in `test_virtual_identity.py` (canonical, hyphenated op id, non-main layer, digit-ending id, malformed inputs). The brittle inline parse is gone. | > | State-key stripping list `(LoopStartId, LoopStartStateURI, table, output)` | **Reduced in 30ba48c39f / 007a264b59 / 411d92f67** — `LoopStartId` / `LoopStartStateURI` / `loop_counter` now ride the `StateFrame` envelope, not user state. Only `(table, output)` (the user-exec runtime scratch) is still stripped; that strip is now tested. | > | First-port assumption (`next(iter(reader_runnables.values()))[0].uri`) | **Fixed in this commit** — `_compute_loop_start_id` raises `RuntimeError` if the input_manager reports more than one input port or more than one reader on the single port, instead of silently picking the dict iterator's first. | > | No tests | **Closed in this commit** — 7 new `test_main_loop.py` cases covering both methods: worker-id parse via `get_operator_id`, URI = state-channel of reader's result URI (so `VFSURIFactory.state_uri` isn't dropped), both defensive raises, the RPC with `target_operator_id` taken from the StateFrame envelope (never from user state), the scratch-key strip preserving user vars, and the exact iceberg-write contract (`create_document → writer("0") → put_one(State.to_tuple(0)) → close`). | > > Diff scoped to `main_loop.py` (defensive guards only) and `test_main_loop.py` (new cases). Loop integration coverage continues to live in `amber/src/test/integration/.../LoopIntegrationSpec.scala`. 23/23 tests in `test_main_loop.py` green. --- **`amber/src/main/python/core/runnables/main_loop.py` L125** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3262435651) **@Copilot** — 2026-05-18: > The hardcoded state-dict keys `"LoopStartId"`, `"LoopStartStateURI"`, `"loop_counter"`, `"table"`, and `"output"` live in user state alongside arbitrary user variables (the `initialization` block writes into `self.state` via `exec(..., self.state)`). A user whose loop body uses any of these names (especially `table` or `output`, which are the documented defaults in the operator UI) will collide silently: their values get stripped on writeback, and `condition` evaluating `i < len(table)` may suddenly see a pickled bytes object after the strip. Consider namespacing the loop machinery under a single reserved key (e.g., `"__loop__": {...}`) so user state is untouched. --- **`amber/src/main/python/core/runnables/main_loop.py` L137** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3262435473) **@Copilot** — 2026-05-18: > In `complete()`, `executor.condition()` is invoked for every `LoopEndOperator` worker, including nested LoopEnds whose `process_state` only saw the pass-through branch (`loop_counter > 0`) and therefore never executed `self.state = dict(state)`. In that case `self.state` either does not exist or is stale from a previous iteration, so `condition()` may raise `AttributeError`/`KeyError` or — worse — return a stale `True` and fire an unintended `jump_to_operator_region` writing garbage state back to LoopStart. Gate the `condition()`/jump on having actually absorbed a terminal state this iteration (e.g., remember whether `process_state` took the `loop_counter == 0` branch). --- **`amber/src/main/python/core/runnables/main_loop.py` L137** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3285892251) **@Xiao-zhen-Liu** — 2026-05-22: > Two issues: > - `condition()` runs the user's Python expression with no error handling. If it throws (typo, wrong variable, divide-by-zero), `executor.close()` is skipped, the worker never finishes its state transition, and the workflow hangs. Same applies to `update`, `initialization`, `output`. > - Nothing stops an infinite loop. If the user's `condition` never returns False, the workflow runs forever with no iteration counter and no progress log. **@aglinxinyuan** — 2026-05-22: > 1. It's expected. We don't provide error handling for user-provided code, and it shares the same design as Python UDF. > 2. Infinite loop is valid. --- **`amber/src/main/python/core/runnables/main_loop.py` L160** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3400851492) **@Xiao-zhen-Liu** — 2026-06-12: > On thread 16, the "same as a Python UDF" reply holds for `update` / `initialization` / `output`, but not for `condition()`: it runs here on the main loop thread inside `complete()`, before `close()` and the COMPLETED transition — not on the guarded path a UDF error would take. A typo in `condition` fails the worker thread instead of being reported. Worth guarding this one call, or noting the difference. **@aglinxinyuan** — 2026-06-13: > Guarded it — fixed in 212687a8a6. > > You're right that `condition()` is the odd one out: `complete()` calls it on the main loop thread, before `close()` and the COMPLETED transition, outside `DataProcessor`'s guarded executor session. An exception there (a typo, an undefined name) propagated through `run()`'s `@logger.catch(reraise=True)` and killed the worker thread silently — the controller never learned of it. > > Now it's caught and reported the same way a UDF error is: > - record it on the exception manager, > - queue an **ERROR** console message, > - flush it, then enter `EXCEPTION_PAUSE` — **skipping the loop-back edge and completion**, so the worker pauses with the error showing instead of dying or falsely reporting success. > > To keep both paths reporting identically, I pulled the console-message build out of `DataProcessor._report_exception` into a shared `ConsoleMessageManager.report_exception(worker_id, exc_info)`; the data path and this main-loop path now call the same helper. > > Tests: added `test_complete_reports_loopend_condition_error_instead_of_crashing` — a Loop End whose `condition()` raises is recorded + reported (ERROR console message) + paused (`EXCEPTION_PAUSE`), with no loop-back and no completion. The full `main_loop`, `data_processor`, and loop-operator suites pass (47 tests); ruff clean. --- **`amber/src/main/python/core/runnables/main_loop.py` L267** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3400851498) **@Xiao-zhen-Liu** — 2026-06-12: > `reset_storage()` runs for any LoopEnd with output state here, regardless of `loop_counter` — but the description says it only fires for the inner LoopEnd of a nested loop. See the note on `output_manager.py:241`. **@aglinxinyuan** — 2026-06-13: > You're right — the code is correct and the description was stale (and, worse, named the wrong branch). Fixed the description, and added a call-site comment in 3f11520450. > > `reset_output_storage()` fires **once per iteration for every Loop End**, on the matching-loop consume (`loop_counter == 0`). The nested pass-through (`loop_counter > 0`) is forwarded and returned in `_process_state_frame` *before* `process_input_state` runs, so it never resets. The old description had this backwards on three counts: it said inner-LoopEnd-only (it's every LoopEnd), it named the `loop_counter > 0` pass-through branch (reset is actually the `== 0` consume branch), and it claimed single loops never reset (they reset every iteration). The method's own docstring already states the correct behavior — *"once per loop iteration … each iteration must start from empty tables so the materialization holds only the final iteration's rows"* — so the description was the only wrong artifact. > > Why reset-every-iteration is correct, not a bug: dropping+recreating the Loop End's output each iteration is what keeps the materialization at the **final** iteration's rows instead of all iterations concatenated. Gating it to nested-inner-only would break the single-loop case, so no code change. > > Done: > * PR description "Worker output" row rewritten to match the code + docstring. > * Added a comment at the `reset_output_storage()` call site in `process_input_state` documenting the firing condition (where you were reading), so the next reader doesn't hit the same ambiguity. > > No behavior change. 33 main_loop + output_manager tests still pass. --- **`amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala` L583** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3400851511) **@Xiao-zhen-Liu** — 2026-06-12: > **Resolved threads 21 and 22, but the fixes aren't in the branch.** The replies cited commits (`ca9e5ce8cc`, `540b7ba274`, `bbec98282e`) that aren't here. The flag is still the type-named `isLoopEnd`, and this skip-create branch is still untested. Please rename it to describe the behavior, and add a test that pre-creates the documents and checks they're reused, not recreated. **@aglinxinyuan** — 2026-06-13: > Both halves now in the branch. > > **Rename** (landed in 3d4f15b445): `isLoopEnd` → `reusesOutputStorageOnReExecution` on `PhysicalOp` (+ `withReusesOutputStorageOnReExecution`), the `RegionExecutionCoordinator` guard, `LoopEndOpDesc`, and the specs. > > **Test for the skip-create branch** (043ed302c3): I pulled the create-or-reuse decision out of the private `createOutputPortStorageObjects` into a pure companion method: > > ```scala > def provisionOutputDocument(uri, reuseExistingStorage, documentExists, createDocument): Boolean > ``` > > with the storage ops injected, so the decision is unit-testable without an iceberg backend or a live region. `RegionOutputProvisioningSpec` pins the four cases with a `createDocument` spy — directly the "pre-create, then check reused not recreated" you asked for: > > * **reuse + existing document → NOT recreated** (createDocument never called) — accumulated loop output survives the re-run; > * reuse + no document yet → created (first iteration); > * no-reuse + existing → recreated/overwritten (fresh every run); > * no-reuse + none → created; > * plus: no-reuse short-circuits and never even probes `documentExists`. > > On verification: I confirmed the production change compiles — the only remaining `amber` compile errors are the pre-existing `PveManager` / `virtual_environments` JOOQ issue from #5577 (unrelated; my local DB isn't migrated with that table, CI builds against a fresh schema). The new spec is a pure ScalaTest unit with no iceberg/actor dependency, so it runs in the normal amber test job. > > (For context on why these kept showing as "cited but not in the branch": the branch has been force-rebased a few times, which dropped the earlier commits — re-applied now on the current tip.) --- **`amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala` L589** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3285892266) **@Xiao-zhen-Liu** — 2026-05-22: > The skip-create branch has no tests. Without this change, loops would wipe their own output every iteration — so this is a load-bearing invariant with no coverage. **@aglinxinyuan** — 2026-05-27: > Added coverage in ca9e5ce8cc. > > Extracted the per-document create-or-reuse decision out of the private `createOutputPortStorageObjects` into a testable `RegionExecutionCoordinator.provisionOutputDocument(uri, schema, reuseIfExists)`, then added `RegionExecutionCoordinatorSpec` exercising the full truth table against a real iceberg-backed `DocumentFactory`: > > | reuseIfExists | doc state before | expected after | > |---|---|---| > | false | absent | empty doc created | > | true | absent (1st iteration) | empty doc created | > | **true** | **has 3 rows** | **3 rows preserved** ← the load-bearing invariant | > | false | has 3 rows | wiped to 0 (contrast case) | > > The third case is the one you flagged: it proves a re-executing loop region does **not** wipe the output its previous iterations accumulated. The fourth shows the non-reuse path still starts fresh, so the guard is actually doing something. No behavior change — `createOutputPortStorageObjects` just delegates to the helper now. --- **`amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala` L71** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3285892233) **@Xiao-zhen-Liu** — 2026-05-22: > Two issues: > - The execution service uses `isInstanceOf[LoopStartOpDesc]` to detect loops, which makes a generic service depend on a specific operator class. > - This rule (force MATERIALIZED if a LoopStart is in the plan) has no tests. **@aglinxinyuan** — 2026-05-27: > Both addressed in 1848ce00fb. > > **1. Generic service no longer depends on a specific operator class.** Added `LogicalOp.requiresMaterializedExecution` (default `false`); `LoopStartOpDesc` and `LoopEndOpDesc` override it to `true`. `WorkflowExecutionService` now checks that flag instead of `isInstanceOf[LoopStartOpDesc]` — the `LoopStartOpDesc` import is gone, and any future operator that needs materialized edges just sets the flag. > > **2. The rule is now tested.** Extracted it into a pure, side-effect-free `WorkflowExecutionService.resolveWorkflowSettings(operators, requested)` (the constructor body just delegates), and added `WorkflowExecutionServiceSpec`: > > * `requiresMaterializedExecution` is true for `LoopStart`/`LoopEnd`, false for a non-loop op (`SleepOpDesc`); > * loop present (incl. a plan mixing loop + non-loop ops) → coerced to MATERIALIZED; > * `LoopEnd` alone also coerces; > * non-loop op and empty plan pass through unchanged; > * idempotent when the user already chose MATERIALIZED; > * only the `executionMode` field changes (other settings preserved). > > 7 tests, all passing. --- **`amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala` L74** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3285892230) **@Xiao-zhen-Liu** — 2026-05-22: > When the workflow contains a Loop Start, this block forces the execution mode to MATERIALIZED. The frontend setting still displays whatever the user picked. The UI and the running system disagree, and the user has no way to know. **@aglinxinyuan** — 2026-05-27: > Fixed in 30bf1cd136 by failing loud instead of silently coercing. > > `WorkflowExecutionService.validateExecutionMode` now throws an `IllegalArgumentException` when the plan contains an operator that requires materialized execution but the requested mode isn't MATERIALIZED: > > > This workflow contains operators that require MATERIALIZED execution mode (e.g. Loop Start / Loop End). Please set the execution mode to Materialized in the workflow settings and run again. > > `WorkflowService` already wraps the constructor in `try { ... } catch { case e => errorHandler(e) }`, so this surfaces to the UI as a fatal workflow error — the user is told to switch the mode and re-run, rather than the UI and engine silently disagreeing. The requirement is still keyed off the generic `LogicalOp.requiresMaterializedExecution` flag (no operator-class dependency), and the spec covers it: loop + non-MATERIALIZED throws (including a LoopEnd-only plan), while a loop already set to MATERIALIZED, a non-loop op, and an empty plan all pass. **@aglinxinyuan** — 2026-06-09: > <img width="1417" height="764" alt="image" src="https://github.com/user-attachments/assets/d37f82df-535c-44ba-a6d9-cd1a6525e4df" /> --- **`amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala` L75** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3262435635) **@Copilot** — 2026-05-18: > The coercion to `MATERIALIZED` happens only when a `LoopStartOpDesc` is present, but `LoopEndOpDesc` is what actually requires the iceberg output to persist across region invocations (see `RegionExecutionCoordinator`'s `isLoopEnd` branch). A workflow with a `LoopEndOpDesc` but no `LoopStartOpDesc` (e.g., malformed, partially-edited, or in a future use of LoopEnd as a standalone control op) would silently slip through pipelined mode. Also consider also checking for `LoopEndOpDesc` to keep both halves in sync. --- **`amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala` L78** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3400851505) **@Xiao-zhen-Liu** — 2026-06-12: > **Resolved thread 11, but the fix isn't in the branch.** The reply described a generic `LogicalOp.requiresMaterializedExecution` flag and a test (commit `1848ce00fb`); neither exists. Line 78 still uses `isInstanceOf[LoopStartOpDesc]`, tying this service to one operator class, with no test. It also only checks LoopStart, so a plan with only a LoopEnd would skip the check. Please re-open. **@aglinxinyuan** — 2026-06-13: > Re-applied in 7fd110a833 (it was lost in a force-rebase). > > All three points: > > 1. **No longer tied to a class.** Added `LogicalOp.requiresMaterializedExecution` (default false); `LoopStartOpDesc` and `LoopEndOpDesc` both override it to true. `WorkflowExecutionService` checks the flag, so the `LoopStartOpDesc` import/`isInstanceOf` is gone — any future operator needing materialization just sets the flag. > > 2. **LoopEnd-only no longer slips through.** Because the check is `operators.exists(_.requiresMaterializedExecution)` and LoopEnd sets the flag too, a plan with a LoopEnd but no LoopStart now triggers the guard. There's an explicit test for that case. > > 3. **Now tested.** Extracted the logic into a pure `WorkflowExecutionService.validateExecutionMode(operators, settings)` (the constructor delegates to it), and added `WorkflowExecutionServiceSpec`: > * `requiresMaterializedExecution` true for LoopStart/LoopEnd, false for a non-loop op (SleepOpDesc); > * loop + PIPELINED → throws (incl. the LoopEnd-only plan); > * loop + MATERIALIZED, non-loop + PIPELINED, and an empty plan → no throw. > > Verified locally (after migrating my local DB for the unrelated #5577 JOOQ table): 6 new tests pass, the 29 LoopStart/LoopEnd op-desc specs pass, and amber compiles. The behavior stays fail-loud (throw with an actionable message), per the earlier decision on this thread. --- **`amber/src/test/integration/org/apache/texera/amber/engine/e2e/LoopIntegrationSpec.scala` L160** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3400851518) **@Xiao-zhen-Liu** — 2026-06-12: > Both cases only check that the workflow reaches COMPLETED, then `succeed` — they never read the output or the iteration count. For a loop, the iteration count is the main thing to verify: a counter bug that still terminates would pass this test. The description's "3 iterations" / "9 nested" are what to assert here. **@aglinxinyuan** — 2026-06-13: > Fixed in 962b6c1401 — both tests now assert the iteration count, not just termination. > > Each test captures the terminal LoopEnd's cumulative output-tuple count from `ExecutionStatsUpdate` (keyed by logical op id; the controller delivers it after querying final worker stats, before `ExecutionStateUpdate(COMPLETED)`; the worker persists across the `JumpToOperatorRegion` re-executions, so its output statistic accumulates across iterations rather than resetting). Since `LoopEnd.process_table` is an identity pass-through (`yield table`), by conservation that count equals the number of rows that flowed through the loop = the iteration count. > > * **Single loop**: `assert(LoopEnd output == 3)` — `i` advances 0,1,2 and stops at `i == 3`. An off-by-one that still terminated lands on 2 or 4. > * **Nested loop**: I also had to fix the workflow — the outer `LoopStart` previously emitted `table.iloc[i]` (one row), so the inner loop would have run only once per outer iteration (3 total), **not** the 9 the comment claimed. It now emits the whole `table` (matching Nested.Loop.json), so the inner body genuinely runs 3 × 3 = 9 times; `assert(outer LoopEnd output == 9)`. > > I asserted only the terminal LoopEnd counts because they're robust by the identity-passthrough conservation argument regardless of nested region-scheduling details; the `9` also matches the Nested.Loop.json run in the PR description. Note these are `@IntegrationTest` (postgres + MinIO + Python workers), so the numbers will be confirmed by the `amber-integration` CI job rather than locally — flagging in case the nested count needs a tweak once it runs there. --- **`amber/src/test/python/core/models/test_loop_operators.py` L1** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3285892281) **@Xiao-zhen-Liu** — 2026-05-22: > Verify CI picks up this path. Other Python sources are under `amber/src/main/python/...`; if `amber/src/test/python/...` isn't included in the runner config, these tests silently don't run. **@aglinxinyuan** — 2026-05-22: > Verified — the tests are picked up: > > * `amber/pyproject.toml` declares `testpaths = ["src/test/python"]`, so any `pytest` run from `amber/` discovers anything under `src/test/python/...`. > * `.github/workflows/build.yml` line 626 runs `cd amber && pytest -m "not integration" ...` from that directory. > * On the most recent CI run for this PR ([python 3.10 job](https://github.com/apache/texera/actions/runs/26308949081/job/77452693976)), all 12 `test_loop_operators.py` tests ran and passed — grep `test_loop_operators` in the log returns 12 PASSED lines. --- **`amber/src/test/python/core/models/test_loop_operators.py` L231** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3285892285) **@Xiao-zhen-Liu** — 2026-05-22: > `pytest.raises(TypeError, match="condition")` matches on Python's "missing abstract method" error text. That wording has changed between CPython versions before. Match on `"abstract"` instead, or drop `match`. **@aglinxinyuan** — 2026-05-22: > Fixed in c852a7daa0 — `match="abstract"` now, plus a comment recording why (CPython's missing-abstract-method message wording has changed between releases but always contains "abstract"). --- **`common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala` L201** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3285892270) **@Xiao-zhen-Liu** — 2026-05-22: > `isLoopEnd` names the flag after one specific operator type. What the scheduler actually checks is more general — "this operator's output must survive across re-runs of its region." If anything else ever needs the same behavior, we'll either reuse a misnamed flag or add a near-duplicate. **@aglinxinyuan** — 2026-05-27: > Good point — fixed in 540b7ba274, with the flag renamed in bbec98282e. > > The branch had since moved to detecting loop ends by string-matching the operator id (`startsWith("LoopEnd-operator-")`), which keys the behavior to the operator type even more tightly than `isLoopEnd` did. Replaced both with a behavior-named flag: > > * `PhysicalOp.reusesOutputStorageOnReExecution: Boolean = false` — documented as "this operator's output storage should be reused (reopened) rather than recreated fresh when its region is executed more than once", explicitly noting any operator can set it, not just Loop End. > * `LoopEndOpDesc` sets it via `.withReusesOutputStorageOnReExecution(true)`. > * `RegionExecutionCoordinator` now checks `region.getOperators.exists(_.reusesOutputStorageOnReExecution)` instead of the id prefix. > > The name states exactly what the scheduler does — reopen existing output storage on a region re-run instead of recreating it — so it reasons about the property rather than the operator, and a future operator needing the same treatment just sets the flag. --- **`common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala` L201** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3400851514) **@Xiao-zhen-Liu** — 2026-06-12: > **Resolved thread 22, but the rename isn't in the branch.** Still `isLoopEnd` — named after the operator, not after the behavior the scheduler checks ("keep this operator's output across a region re-run"). Renaming it lets the next operator that needs the same behavior reuse it without a misleading name. **@aglinxinyuan** — 2026-06-13: > You're right — the rename was lost in a rebase. Re-applied in 3d4f15b445. > > `isLoopEnd` → `reusesOutputStorageOnReExecution` (named after the behavior the scheduler checks — "keep this operator's output storage across a region re-run" — not after LoopEnd): > > * `PhysicalOp`: field + `withIsLoopEnd` → `withReusesOutputStorageOnReExecution`, with a doc comment stating the behavior and that any operator needing it can set it. > * `RegionExecutionCoordinator`: the skip-recreate guard now checks `region.getOperators.exists(_.reusesOutputStorageOnReExecution)` (local val + comment reworded to the behavior). > * `LoopEndOpDesc` sets `.withReusesOutputStorageOnReExecution(true)`. > * Loop specs + mixin updated. > > So the next operator that must preserve its output across a region re-execution can reuse the flag without a LoopEnd-specific misnomer. > > Verified: WorkflowCore + WorkflowOperator compile and all 29 LoopStart/LoopEnd op-desc specs pass. (Heads up: the `amber` module's local compile is currently blocked by an unrelated pre-existing issue — `PveManager` from #5577 references the `virtual_environments` table, which my un-migrated local DB lacks, so JOOQ codegen omits it; that's environmental and CI compiles against a fresh schema. The `RegionExecutionCoordinator` edit is a one-token rename to the now-existing field.) --- **`common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala` L83** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3262435405) **@Copilot** — 2026-05-18: > Same problem as `LoopStartOpDesc`: `update` and `condition` are interpolated directly into double-quoted `exec(...)` literals. Any expression containing `"`, `\`, or a newline will produce invalid Python, and the values are effectively executable code-injection sites. Inline them as plain Python statements (e.g., on their own indented line) or escape them defensively. --- **`common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala` L37** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3285892274) **@Xiao-zhen-Liu** — 2026-05-22: > User expressions execute against `self.state`, which the runtime also uses to store `loop_counter`, `table`, `output`, `LoopStartId`, `LoopStartStateURI`. A user writing `loop_counter = 0` in their `initialization`, or `table = ...`, silently overwrites loop machinery — with no warning. Same applies to `update` / `condition` in `LoopEndOpDesc`. **@aglinxinyuan** — 2026-06-03: > Addressed across 411d92f67 (plus the earlier loop_counter / LoopStartId / LoopStartStateURI moves). The reserved names no longer share the user exec namespace: > > - `loop_counter`, `LoopStartId`, `LoopStartStateURI` are now entirely off `State` -- they ride the `StateFrame` envelope as their own materialized columns, owned by the runtime, so they never appear in `self.state` and user code cannot touch them. > - `table` and `output`: each user expression (`initialization` / `output` / `update` / `condition`) now runs in a throwaway namespace seeded with the user loop variables plus the input `table`; `output` is read back out of it, and only the user variables are persisted into `self.state`. So `table` stays readable, but neither reserved name can persist in -- or be silently clobbered out of -- the loop state. > > A user writing `loop_counter = 0`, `table = ...`, etc. in their loop code can therefore no longer overwrite loop machinery. The exec logic lives in tested LoopStart/LoopEnd base helpers (`eval_output` / `run_update` / `eval_condition`); the generated operators just delegate. --- **`common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala` L73** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3285892278) **@Xiao-zhen-Liu** — 2026-05-22: > The user's expression is dropped into a string literal: `exec("$initialization", {}, self.state)`. With `initialization = 'name = "foo"'`, the generator produces `exec("name = "foo"", ...)` — invalid Python. The error doesn't surface until the operator's class loads at run time, far from where the user typed the expression. Multi-line input has the same problem. The same pattern appears in `LoopEndOpDesc.scala:78-83`. **@aglinxinyuan** — 2026-06-04: > Fixed in 9ec60f07cc. > > Switched the four user fields (`initialization`, `output`, `update`, `condition`) to `EncodableString` and rewrote both `generatePythonCode()` with the `pyb"..."` macro from `common/pybuilder`. The macro base64-encodes each splice at build time and emits a `self.decode_python_template('<b64>')` call at runtime — the raw user text never enters the generated source, so quotes/newlines/backslashes/etc. can't break it. It also rejects (at compile time) any splice placed inside a quoted string, so the old `"$initialization"` pattern can't sneak back in. The surrounding `"..."` around the `exec(...)` arguments is gone because the decoder already returns a Python `str`; for the `output = ` / `condition` branches the literal prefix stays inline (`exec("output = " + $output, ...)`), so the runtime concatenates the literal with the decoded expression. > > Added `LoopOpDescsSpec` (new — there was no spec for the loop OpDescs) covering both: subclassing of `LoopStart/EndOperator`, the four `exec` call sites use `self.decode_python_template`, a tricky value with `"`, `'`, `\n`, `\` does NOT appear verbatim in the generated source, and empty-default fields still produce a parseable template. 10 tests, all passing. > > Diff scoped to the two OpDescs + the new spec; no unrelated churn. --- **`common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala` L76** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3262435359) **@Copilot** — 2026-05-18: > User-supplied expressions are interpolated raw into a double-quoted Python `exec(...)` string. Any double quote, backslash, or newline in `initialization` or `output` will produce a Python SyntaxError at operator-build time and is a code-injection vector (e.g., an `output` containing `"); import os; os.system("...` would escape the exec call entirely). Consider injecting these expressions as proper indented Python source rather than as `exec` string literals, or at minimum escape the value (e.g., wrap in triple-quotes and reject embedded triple-quotes). --- **`common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/loop/LoopEndOpDescSpec.scala` L1** · [view on #4206](https://github.com/apache/texera/pull/4206#discussion_r3285892288) **@Xiao-zhen-Liu** — 2026-05-22: > Two test-suite issues: > - Nothing in the tests actually compiles the generator's output. The Scala specs check that the generated string contains expected substrings; the Python tests use hand-written stub classes that mimic what the generator should produce. So a quote or newline in user input that breaks the codegen (`LoopStartOpDesc.scala:70-73` / `LoopEndOpDesc.scala:78-83`) passes both test layers and only fails at run time. > - `LoopStartOpDescSpec` and `LoopEndOpDescSpec` duplicate about 80% of their scaffolding. **@aglinxinyuan** — 2026-05-22: > Both addressed in d41918f461. > > **1. Codegen robustness.** Both `LoopStartOpDesc.generatePythonCode` and `LoopEndOpDesc.generatePythonCode` are now built with the `pyb"..."` interpolator, with `initialization` / `output` / `update` / `condition` typed as `EncodableString`. Every user value is base64-encoded at build time and rendered as `self.decode_python_template(<b64>)` instead of being inlined as a raw quoted substring. A `"` / `` / `\n` / `\` in user input therefore cannot escape into the surrounding Python syntax. Added 8 new tests (4 per spec) that exercise these exact tricky inputs and assert the raw text is **absent** from the generated source while the expected `decode_python_template`-wrapped substring is present. > > **2. Spec deduplication.** Extracted `LoopOpDescSpecMixin` carrying the `workflowId` / `executionId` vals, the `b64` / `decodeExpr` helpers, and the shared physical-op assertions (`assertNonParallelizableSingleWorker`, `assertPortsCarriedForward`, `assertOpExecWithPythonCodeForClass`, `assertUserInputIsBase64Wrapped`). Both specs are now focused on the per-operator differences only. </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
