Copilot commented on code in PR #4206: URL: https://github.com/apache/texera/pull/4206#discussion_r3262435359
########## common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.loop + +import com.fasterxml.jackson.annotation.JsonProperty +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} + +class LoopStartOpDesc extends LogicalOp { + @JsonProperty(required = true, defaultValue = "i = 0") + @JsonSchemaTitle("Initialization") + var initialization: String = _ + + @JsonProperty(required = true, defaultValue = "table.iloc[i]") + @JsonSchemaTitle("Output") + var output: String = _ + + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithCode(generatePythonCode(), "python") + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withSuggestedWorkerNum(1) + .withParallelizable(false) + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Loop Start", + "Begin a loop that iterates over rows of the input table; pairs with Loop End.", + OperatorGroupConstants.CONTROL_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) + + def generatePythonCode(): String = { + s""" + |from pytexera import * + |class ProcessLoopStartOperator(LoopStartOperator): + | @overrides + | def open(self): + | self.state = {"loop_counter": 0} + | exec("$initialization", {}, self.state) + | + | @overrides + | def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]]: + | self.state["table"] = table + | exec("output = $output", {}, self.state) Review Comment: User-supplied expressions are interpolated raw into a double-quoted Python `exec(...)` string. Any double quote, backslash, or newline in `initialization` or `output` will produce a Python SyntaxError at operator-build time and is a code-injection vector (e.g., an `output` containing `"); import os; os.system("...` would escape the exec call entirely). Consider injecting these expressions as proper indented Python source rather than as `exec` string literals, or at minimum escape the value (e.g., wrap in triple-quotes and reject embedded triple-quotes). ########## common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.loop + +import com.fasterxml.jackson.annotation.JsonProperty +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} + +class LoopEndOpDesc extends LogicalOp { + @JsonProperty(required = true, defaultValue = "i += 1") + @JsonSchemaTitle("Update") + var update: String = _ + + @JsonProperty(required = true, defaultValue = "i < len(table)") + @JsonSchemaTitle("Condition") + var condition: String = _ + + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithCode(generatePythonCode(), "python") + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withSuggestedWorkerNum(1) + .withParallelizable(false) + .withIsLoopEnd(true) + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Loop End", + "Close a loop body and decide whether to iterate again based on a condition; pairs with Loop Start.", + OperatorGroupConstants.CONTROL_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) + + def generatePythonCode(): String = { + s""" + |from pytexera import * + |class ProcessLoopEndOperator(LoopEndOperator): + | @overrides + | def process_state(self, state: State, port: int) -> Optional[State]: + | loop_counter = int(state.get("loop_counter", 0)) + | if loop_counter > 0: + | state["loop_counter"] = loop_counter - 1 + | return state + | self.state = dict(state) + | from pickle import loads + | self.state["table"] = loads(self.state["table"]) + | exec("$update", {}, self.state) + | return None + | + | @overrides + | def condition(self) -> bool: + | exec("output = $condition", {}, self.state) Review Comment: Same problem as `LoopStartOpDesc`: `update` and `condition` are interpolated directly into double-quoted `exec(...)` literals. Any expression containing `"`, `\`, or a newline will produce invalid Python, and the values are effectively executable code-injection sites. Inline them as plain Python statements (e.g., on their own indented line) or escape them defensively. ########## amber/src/main/python/core/runnables/main_loop.py: ########## @@ -87,19 +92,54 @@ def __init__( target=self.data_processor.run, daemon=True, name="data_processor_thread" ).start() + def _attach_loop_start_id(self, output_state: State) -> None: + if "LoopStartId" in output_state: + return + output_state["LoopStartId"] = self.context.worker_id.split("-", 1)[1].rsplit( + "-main-0", 1 + )[0] Review Comment: Deriving the LoopStart operator id from `worker_id` by string-splitting on `-` and `-main-0` is brittle: it silently assumes the worker name ends with `-main-0` and that the operator id itself never contains the literal `-main-0`. `SpecialPhysicalOpFactory` and other code paths already produce layer names containing underscores/hyphens, so a future renaming of the layer suffix or worker index will break this without any error. Prefer using the operator identity already available from the worker's context (e.g., the parsed `ActorVirtualIdentity` / physical-op id) rather than re-parsing the worker id string. ########## amber/src/main/python/core/models/operator.py: ########## @@ -291,3 +291,30 @@ def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike] time, or None. """ yield + + +class LoopStartOperator(TableOperator): + @overrides.final + def process_state(self, state: State, port: int) -> Optional[State]: + if "LoopStartStateURI" in state: + state["loop_counter"] += 1 Review Comment: `LoopStartOperator.process_state` does `state["loop_counter"] += 1` when the incoming state is from an outer scope. This silently assumes the incoming state already contains a `loop_counter` key, which is only true if the outer scope was also produced by a `LoopStart` whose `open()` initialized it. Any non-loop upstream that supplies a state carrying `LoopStartStateURI` (or an outer LoopStart whose user `initialization` happens not to define `loop_counter`) will raise `KeyError` at runtime. Either default-initialize via `state.get("loop_counter", 0) + 1` or assert the precondition with a clear error. ########## amber/src/main/python/core/runnables/main_loop.py: ########## @@ -87,19 +92,54 @@ def __init__( target=self.data_processor.run, daemon=True, name="data_processor_thread" ).start() + def _attach_loop_start_id(self, output_state: State) -> None: + if "LoopStartId" in output_state: + return + output_state["LoopStartId"] = self.context.worker_id.split("-", 1)[1].rsplit( + "-main-0", 1 + )[0] + # The URI lives on the upstream operator's output port (which + # LoopStart's first materialization reader is reading from). + reader_runnables = ( + self.context.input_manager.get_input_port_mat_reader_threads() + ) + output_state["LoopStartStateURI"] = VFSURIFactory.state_uri( + next(iter(reader_runnables.values()))[0].uri + ) + + def _jump_to_loop_start( + self, executor: LoopEndOperator, controller_interface + ) -> None: + state = executor.state + controller_interface.jump_to_operator_region( + JumpToOperatorRegionRequest(OperatorIdentity(state["LoopStartId"])) + ) + uri = state["LoopStartStateURI"] + # Strip the per-iteration scratch (`table`, `output`) and the + # loop metadata (`LoopStartId`, `LoopStartStateURI`) so only the + # user-visible loop state is written back to LoopStart's input. + for key in ("table", "output", "LoopStartId", "LoopStartStateURI"): + state.pop(key, None) + writer = DocumentFactory.create_document(uri, State.SCHEMA).writer("0") + writer.put_one(State(state).to_tuple()) + writer.close() + def complete(self) -> None: """ Complete the DataProcessor, marking state to COMPLETED, and notify the controller. """ # flush the buffered console prints self._check_and_report_console_messages(force_flush=True) - self.context.executor_manager.executor.close() + controller_interface = self._async_rpc_client.controller_stub() + executor = self.context.executor_manager.executor + if isinstance(executor, LoopEndOperator) and executor.condition(): + self._jump_to_loop_start(executor, controller_interface) Review Comment: In `complete()`, `executor.condition()` is invoked for every `LoopEndOperator` worker, including nested LoopEnds whose `process_state` only saw the pass-through branch (`loop_counter > 0`) and therefore never executed `self.state = dict(state)`. In that case `self.state` either does not exist or is stale from a previous iteration, so `condition()` may raise `AttributeError`/`KeyError` or — worse — return a stale `True` and fire an unintended `jump_to_operator_region` writing garbage state back to LoopStart. Gate the `condition()`/jump on having actually absorbed a terminal state this iteration (e.g., remember whether `process_state` took the `loop_counter == 0` branch). ########## amber/src/main/python/core/runnables/main_loop.py: ########## @@ -87,19 +92,54 @@ def __init__( target=self.data_processor.run, daemon=True, name="data_processor_thread" ).start() + def _attach_loop_start_id(self, output_state: State) -> None: + if "LoopStartId" in output_state: + return + output_state["LoopStartId"] = self.context.worker_id.split("-", 1)[1].rsplit( + "-main-0", 1 + )[0] + # The URI lives on the upstream operator's output port (which + # LoopStart's first materialization reader is reading from). + reader_runnables = ( + self.context.input_manager.get_input_port_mat_reader_threads() + ) + output_state["LoopStartStateURI"] = VFSURIFactory.state_uri( + next(iter(reader_runnables.values()))[0].uri + ) + + def _jump_to_loop_start( + self, executor: LoopEndOperator, controller_interface + ) -> None: + state = executor.state + controller_interface.jump_to_operator_region( + JumpToOperatorRegionRequest(OperatorIdentity(state["LoopStartId"])) + ) + uri = state["LoopStartStateURI"] + # Strip the per-iteration scratch (`table`, `output`) and the + # loop metadata (`LoopStartId`, `LoopStartStateURI`) so only the + # user-visible loop state is written back to LoopStart's input. + for key in ("table", "output", "LoopStartId", "LoopStartStateURI"): + state.pop(key, None) + writer = DocumentFactory.create_document(uri, State.SCHEMA).writer("0") + writer.put_one(State(state).to_tuple()) + writer.close() Review Comment: The hardcoded state-dict keys `"LoopStartId"`, `"LoopStartStateURI"`, `"loop_counter"`, `"table"`, and `"output"` live in user state alongside arbitrary user variables (the `initialization` block writes into `self.state` via `exec(..., self.state)`). A user whose loop body uses any of these names (especially `table` or `output`, which are the documented defaults in the operator UI) will collide silently: their values get stripped on writeback, and `condition` evaluating `i < len(table)` may suddenly see a pickled bytes object after the strip. Consider namespacing the loop machinery under a single reserved key (e.g., `"__loop__": {...}`) so user state is untouched. ########## amber/src/main/python/core/models/operator.py: ########## @@ -291,3 +291,30 @@ def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike] time, or None. """ yield + + +class LoopStartOperator(TableOperator): + @overrides.final + def process_state(self, state: State, port: int) -> Optional[State]: + if "LoopStartStateURI" in state: + state["loop_counter"] += 1 + return state + self.state.update(state) + return None + + @overrides.final + def produce_state_on_finish(self, port: int) -> State: + from pickle import dumps + + self.state["table"] = dumps(Table(self._TableOperator__table_data[port])) + return dict(self.state) Review Comment: Reaching into `self._TableOperator__table_data` from a subclass relies on Python's private name-mangling and tightly couples `LoopStartOperator` to the internal storage detail of `TableOperator`. Any rename of `TableOperator.__table_data` (a private attribute, so legitimately renameable without notice) silently breaks loops. Consider exposing a protected accessor on `TableOperator` (e.g., `_get_table_data(port)`) and using it here, or store the pickled table inside `process_table` instead. ########## amber/src/main/python/core/architecture/packaging/output_manager.py: ########## @@ -217,6 +225,19 @@ def save_state_to_storage_if_needed(self, state: State, port_id=None) -> None: elif port_id in self._port_state_writers: self._port_state_writers[port_id][0].put(element) + def reset_storage(self) -> None: + port_id = self.get_port_ids()[0] + storage_uri_base = self._storage_uri_base + self.close_port_storage_writers() + DocumentFactory.create_document( + VFSURIFactory.result_uri(storage_uri_base), + self._ports[port_id].get_schema(), + ) + DocumentFactory.create_document( + VFSURIFactory.state_uri(storage_uri_base), State.SCHEMA + ) + self.set_up_port_storage_writer(port_id, storage_uri_base) Review Comment: `reset_storage` blindly dereferences `self._storage_uri_base` and `self.get_port_ids()[0]`. If it is ever invoked before `set_up_port_storage_writer` has run (e.g., a LoopEnd that received state without yet having its output writer provisioned, or an operator with zero output ports), this will raise `TypeError`/`IndexError` with no context. Add a guard (or assertion with a descriptive error) and consider asserting that the executor really is a LoopEnd at the call site so this state-mutation is scoped to where it is meaningful. ########## amber/src/main/python/core/architecture/packaging/output_manager.py: ########## @@ -217,6 +225,19 @@ def save_state_to_storage_if_needed(self, state: State, port_id=None) -> None: elif port_id in self._port_state_writers: self._port_state_writers[port_id][0].put(element) + def reset_storage(self) -> None: Review Comment: The method name `reset_storage` is too generic for a behavior that only makes sense for LoopEnd operators (it truncates this worker's result and state iceberg tables). The block comment at the constructor even calls it `reset_loopend_storage`, matching the PR description. Renaming the public method to `reset_loopend_storage` (or similar) would make call sites self-documenting and avoid suggesting general-purpose use. ########## amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala: ########## @@ -66,7 +67,12 @@ class WorkflowExecutionService( ) extends SubscriptionManager with LazyLogging { - workflowContext.workflowSettings = request.workflowSettings + workflowContext.workflowSettings = + if (request.logicalPlan.operators.exists(_.isInstanceOf[LoopStartOpDesc])) { + request.workflowSettings.copy(executionMode = ExecutionMode.MATERIALIZED) + } else { + request.workflowSettings + } Review Comment: The coercion to `MATERIALIZED` happens only when a `LoopStartOpDesc` is present, but `LoopEndOpDesc` is what actually requires the iceberg output to persist across region invocations (see `RegionExecutionCoordinator`'s `isLoopEnd` branch). A workflow with a `LoopEndOpDesc` but no `LoopStartOpDesc` (e.g., malformed, partially-edited, or in a future use of LoopEnd as a standalone control op) would silently slip through pipelined mode. Also consider also checking for `LoopEndOpDesc` to keep both halves in sync. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
