This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git

commit aa9fe2d9f084313926bd6bcc766145035381e4ce
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sat Jun 13 17:28:48 2026 -0700

    fix(loop): address Copilot review on #5700
    
    - virtual_identity get_worker_index/get_operator_id: use re.fullmatch so a
      malformed worker id (trailing junk) fails loudly, matching the Scala
      VirtualIdentityUtils full-match semantics and the docstring's stated 
intent.
    - RegionExecutionCoordinator.createOutputPortStorageObjects: decide
      reusesOutputStorage per the operator that owns each output port rather 
than
      region-wide, so a region mixing a reuse op (LoopEnd) with others still
      recreates the others' documents on re-execution. (No behavior change today
      since loop ops are single-op materialized regions; 
correct-by-construction.)
    - output_manager.close_port_storage_writers: clear both the result and state
      writer registries after stopping, not just the state registry.
    - LoopEndOpDesc.generatePythonCode: normalize generated method-body 
indentation
      to 8 spaces (was 6), matching LoopStart and PEP8. The 6-space form was 
valid
      Python; this is cosmetic consistency only.
---
 amber/src/main/python/core/architecture/packaging/output_manager.py | 4 +++-
 amber/src/main/python/core/util/virtual_identity.py                 | 4 ++--
 .../engine/architecture/scheduling/RegionExecutionCoordinator.scala | 6 +++++-
 .../scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala | 6 +++---
 4 files changed, 13 insertions(+), 7 deletions(-)

diff --git 
a/amber/src/main/python/core/architecture/packaging/output_manager.py 
b/amber/src/main/python/core/architecture/packaging/output_manager.py
index a8581e85df..24075f474f 100644
--- a/amber/src/main/python/core/architecture/packaging/output_manager.py
+++ b/amber/src/main/python/core/architecture/packaging/output_manager.py
@@ -279,7 +279,9 @@ class OutputManager:
                 writer.stop()
             for _, _, thread in registry.values():
                 thread.join()
-        self._port_state_writers.clear()
+            # Drop the stopped writers so a later reset/close doesn't act on
+            # stale entries (set_up_port_storage_writer repopulates on reset).
+            registry.clear()
 
     def add_partitioning(self, tag: PhysicalLink, partitioning: Partitioning) 
-> None:
         """
diff --git a/amber/src/main/python/core/util/virtual_identity.py 
b/amber/src/main/python/core/util/virtual_identity.py
index 7d332f4bb6..143aa86d0e 100644
--- a/amber/src/main/python/core/util/virtual_identity.py
+++ b/amber/src/main/python/core/util/virtual_identity.py
@@ -30,7 +30,7 @@ MATERIALIZATION_READER_ACTOR_PREFIX = 
"MATERIALIZATION_READER_"
 
 
 def get_worker_index(worker_id: str) -> int:
-    match = worker_name_pattern.match(worker_id)
+    match = worker_name_pattern.fullmatch(worker_id)
     if match:
         return int(match.group(4))
     raise ValueError("Invalid worker ID format")
@@ -46,7 +46,7 @@ def get_operator_id(worker_id: str) -> str:
     non-match), this raises ``ValueError`` so a malformed worker id fails 
loudly
     rather than yielding a wrong id silently.
     """
-    match = worker_name_pattern.match(worker_id)
+    match = worker_name_pattern.fullmatch(worker_id)
     if match:
         return match.group(2)
     raise ValueError(f"Invalid worker ID format: {worker_id}")
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 9d43f4e18b..1f3e563bca 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -618,7 +618,11 @@ class RegionExecutionCoordinator(
         // output once per outer iteration -- on the Python worker side in
         // MainLoop._process_state_frame -- which is orthogonal to this
         // region-provisioning reuse.)
-        val reusesOutputStorage = 
region.getOperators.exists(_.reusesOutputStorageOnReExecution)
+        // Decided per the operator that OWNS this port, not region-wide: a
+        // region mixing a reuse op (LoopEnd) with others must still recreate
+        // the others' documents on re-execution.
+        val reusesOutputStorage =
+          
region.getOperator(outputPortId.opId).reusesOutputStorageOnReExecution
         Seq((resultURI, schema), (stateURI, State.schema)).foreach {
           case (uri, sch) =>
             RegionExecutionCoordinator.provisionOutputDocument(
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala
index ff7486385c..b2d2a49de9 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala
@@ -51,12 +51,12 @@ class LoopEndOpDesc extends LoopOpDesc {
        |class ProcessLoopEndOperator(LoopEndOperator):
        |    @overrides
        |    def process_state(self, state: State, port: int) -> 
Optional[State]:
-       |      self.run_update($update, state)
-       |      return None
+       |        self.run_update($update, state)
+       |        return None
        |
        |    @overrides
        |    def condition(self) -> bool:
-       |      return self.eval_condition($condition)
+       |        return self.eval_condition($condition)
        |""".encode
   }
 }

Reply via email to