aglinxinyuan opened a new pull request, #5700: URL: https://github.com/apache/texera/pull/5700
> _Re-opened from my fork to satisfy the requirement that contributions come from a fork rather than a branch on the main repository (the prior PR [#4206](https://github.com/apache/texera/pull/4206) was on an `apache/texera` branch). The code is unchanged. The full review history — Copilot's and @Xiao-zhen-Liu's review threads and my replies — is preserved on #4206 for reference._ --- ### What changes were proposed in this PR? Adds two new operators — **Loop Start** and **Loop End** — that let users write a for-loop inside a visual workflow. The loop iterates over rows of a pandas table. The user supplies four small Python snippets: | Field | Where | Example | |---|---|---| | `initialization` | Loop Start | `i = 0` | | `output` | Loop Start | `table.iloc[i]` — the row passed into the loop body each iteration | | `update` | Loop End | `i += 1` | | `condition` | Loop End | `i < len(table)` — keep looping while this is true | Operators placed between Loop Start and Loop End make up the loop body and run once per iteration. When `condition` returns true, the runtime starts another iteration; when it returns false, downstream operators run on the accumulated output. #### How an iteration works ``` Upstream Table │ ▼ ┌──────────┐ loop variables: row i, ┌──────────┐ ┌─────────┐ │ Loop ├───counters, accumulators ─►│ loop ├──────────────►│ Loop │ │ Start │ (the loop's "state") │ body │ │ End │ └──────────┘ └──────────┘ └────┬────┘ ▲ │ │ (1) DCM: "schedule the Loop Start region again" │ │ (2) write the next iteration's state (i, accumulators, table) │ │ to the iceberg table that Loop Start reads its input from │ └──────────────────────────────────────────────────────────────────┘ when condition() == True ``` The arrow from Loop End back to Loop Start is **not an edge in the workflow graph** — the region DAG stays acyclic. The loop-back is done with two separate steps when an iteration ends: 1. Loop End sends a **DCM** (Direct Control Message — Texera's worker→controller control-channel message; it does not flow along data edges, so it doesn't break the acyclic DAG) named `jump_to_operator_region`, asking the controller to schedule the Loop Start region one more time. 2. Loop End writes the updated state — a dict with `i`, any accumulators, and the pickled table — into the iceberg table that Loop Start reads from at the start of every iteration. The newly scheduled Loop Start region then picks up that state and runs the next iteration. The "iceberg table Loop Start reads from" is the same cross-region state channel introduced in #4490; this PR reuses it as the back-edge for loops. #### What changed | Area | File | Purpose | |---|---|---| | Operator definitions | `LoopStartOpDesc.scala`, `LoopEndOpDesc.scala` | Code-gen the Python operator from the user's `initialization` / `output` / `update` / `condition` expressions | | Operator runtime base | `core/models/operator.py` — `LoopStartOperator`, `LoopEndOperator` | Python superclasses the generated code extends; manage `loop_counter` and the state dict | | Scheduler | `RegionExecutionCoordinator.scala` | Don't recreate Loop End's iceberg output between iterations (the output accumulates the body's rows; recreating it would erase what we just wrote) | | Output-storage reuse flag | `PhysicalOp.scala` — `reusesOutputStorageOnReExecution: Boolean` + `withReusesOutputStorageOnReExecution` | A flag the scheduler checks (instead of string-matching the operator id) to skip recreating an operator's output on region re-execution; Loop End sets it so a loop's own iterations accumulate | | Worker runtime | `MainLoop._attach_loop_start_id`, `_jump_to_loop_start` | On Loop End completion: send the DCM and write the next state | | Worker output | `OutputManager.reset_output_storage` | A Loop End's materialized output **accumulates the results of all of its own iterations** (a single / outermost loop keeps everything — the scheduler reuses its output doc across re-runs and the writer appends). `reset_output_storage` exists only for the **inner** Loop End of a **nested** loop: it fires once per **outer** iteration, on the outer-loop pass-through (`loop_counter > 0`) in `_process_state_frame`, dropping and recreating that Loop End's result/state iceberg tables so the new outer iteration accumulates from empty instead of concatenating across outer iterations. A single / outermost Loop End never sees `loop_counter > 0`, so it never resets. Safe because loops run MATERIALIZED — downstream doesn't read until the loop region completes. | | Execution mode | `WorkflowExecutionService.scala` | Reject a loop workflow that is submitted with a non-MATERIALIZED execution mode by throwing an `IllegalArgumentException` with an actionable message. `WorkflowService` catches it and routes it through `errorHandler` → `WorkflowFatalError` → the existing diff handler, so the frontend sees a clear "switch the mode and re-run" error instead of the UI and the engine silently disagreeing. | | Frontend | `LoopStart.png`, `LoopEnd.png` | Operator icons | #### Nested loops Each state carries a `loop_counter` integer marking which loop's iteration the state belongs to. This is what keeps an **inner** Loop End from accidentally consuming an **outer** loop's state. | Operator | Rule | |---|---| | Inner `LoopStart.process_state` | If the state already came from a Loop Start (`LoopStartStateURI` is set), `loop_counter += 1` and pass it through. | | Inner `LoopEnd.process_state` | If `loop_counter > 0`, decrement and pass it through (this state belongs to an outer loop). | | Matching `LoopEnd` | If `loop_counter == 0`, the state is mine: run `update`, evaluate `condition`, jump back to my paired Loop Start. | So when two loops are nested, the outer loop's state walks through the inner Loop Start (+1) and the inner Loop End (−1) untouched, arrives at the outer Loop End at `loop_counter == 0`, and only there is it consumed. ### Any related issues, documentation, discussions? Closes #4442. Builds on #4490 (cross-region state materialization) and #5085 (`DocumentFactory.documentExists`). ### How was this PR tested? * `LoopStartOpDescSpec.scala` / `LoopEndOpDescSpec.scala` — code-gen output, ports, `isLoopEnd` flag. * `test_loop_operators.py` — runtime base classes: flat-loop matching branch (runs `update` / `condition`) **and** nested-loop pass-through (loop_counter +/− symmetric across an outer × inner traversal). * `sbt scalafmtCheckAll scalafixAll --check` and `ruff check` clean. #### Manual workflows Input for both is a 3-row table from `TextInput("1\n2\n3")`. Each loop's condition is `i < len(table)`. | Workflow | Topology | Expected | |---|---|---| | [Loop.json](https://github.com/user-attachments/files/27985168/Loop.json) | `TextInput → LoopStart → LoopEnd` | 3 iterations, workflow terminates. | | [Nested Loop.json](https://github.com/user-attachments/files/27985169/Nested.Loop.json) | `TextInput → OuterLoopStart → InnerLoopStart → InnerLoopEnd → OuterLoopEnd` | Outer runs 3 times, inner runs 3 times per outer iteration = **9** total inner iterations. Workflow terminates. | Demo: Basic Loop: <img width="1715" height="368" alt="loop" src="https://github.com/user-attachments/assets/4b9ea672-b5c9-4392-9ac3-764cc9cbb772" /> Nested Loop: <img width="1715" height="368" alt="nested" src="https://github.com/user-attachments/assets/240ea180-43ca-4815-aa6b-0ff82eb81d7b" /> ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.7 in compliance with ASF. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
