This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 02018cdfc2 fix(amber): Python internal marker replay during 
reconfiguration (#4547)
02018cdfc2 is described below

commit 02018cdfc25eccde569e34a69d45bce18e8a692d
Author: Yicong Huang <[email protected]>
AuthorDate: Tue Apr 28 03:50:29 2026 -0700

    fix(amber): Python internal marker replay during reconfiguration (#4547)
    
    ### What changes were proposed in this PR?
    This PR fixes the Python reconfiguration hang reported in #4545 and
    explicitly re-enables the regression test that was temporarily ignored
    in #4546.
    
    The regression came from `#4424` (`ef66190f22`), which changed the
    lifetime of Python `current_internal_marker`. After that change,
    `get_internal_marker()` stopped consuming the marker on read, and the
    main loop deferred cleanup until after replaying internal channel
    markers.
    
    For Python source operators, that allowed an internal `EndChannel`
    marker to remain visible across the pause and reconfiguration window.
    When the reconfiguration ECM was processed, the stale marker could be
    observed and replayed again, which corrupted end-of-stream handling and
    caused the workflow to hang.
    
    This PR restores the expected one-time consumption behavior by:
    - making `get_internal_marker()` consume `current_internal_marker` when
    it is read
    - removing the extra delayed cleanup after replaying internal channel
    markers in the Python main loop
    - changing `should propagate reconfiguration through a source operator
    in workflow` in `ReconfigurationSpec` from `ignore` back to a normal
    enabled test
    
    ### Any related issues, documentation, discussions?
    Fixes #4545.
    
    Regression introduced by #4424.
    Re-enables the temporary test disable from #4546 after fixing the
    underlying lifecycle bug.
    
    ### How was this PR tested?
    Tested with existing Scala tests using Java 11:
    - `WorkflowExecutionService/testOnly
    org.apache.texera.amber.engine.e2e.ReconfigurationSpec`
    
    This run included the re-enabled `should propagate reconfiguration
    through a source operator in workflow` case. The full
    `ReconfigurationSpec` passed on the rebased branch (`5/5` passed, `0`
    ignored).
    
    ### Was this PR authored or co-authored using generative AI tooling?
    Generated-by: OpenAI Codex (GPT-5)
---
 .../python/core/architecture/managers/tuple_processing_manager.py   | 3 ++-
 amber/src/main/python/core/runnables/main_loop.py                   | 1 -
 .../org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala    | 6 +-----
 3 files changed, 3 insertions(+), 7 deletions(-)

diff --git 
a/amber/src/main/python/core/architecture/managers/tuple_processing_manager.py 
b/amber/src/main/python/core/architecture/managers/tuple_processing_manager.py
index 3d4c9eee26..a67949e671 100644
--- 
a/amber/src/main/python/core/architecture/managers/tuple_processing_manager.py
+++ 
b/amber/src/main/python/core/architecture/managers/tuple_processing_manager.py
@@ -33,7 +33,8 @@ class TupleProcessingManager:
         self.finished_current: Event = Event()
 
     def get_internal_marker(self) -> Optional[InternalMarker]:
-        return self.current_internal_marker
+        ret, self.current_internal_marker = self.current_internal_marker, None
+        return ret
 
     def get_input_tuple(self) -> Optional[Tuple]:
         ret, self.current_input_tuple = self.current_input_tuple, None
diff --git a/amber/src/main/python/core/runnables/main_loop.py 
b/amber/src/main/python/core/runnables/main_loop.py
index 7b22e08606..9356542a08 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -342,7 +342,6 @@ class MainLoop(StoppableQueueBlockingRunnable):
                     StartChannel: self._process_start_channel,
                     EndChannel: self._process_end_channel,
                 
}[type(self.context.tuple_processing_manager.current_internal_marker)]()
-                self.context.tuple_processing_manager.current_internal_marker 
= None
 
     def _send_ecm_to_data_channels(
         self, method_name: str, alignment: EmbeddedControlMessageType
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
index c13a5c13aa..92dfba19de 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
@@ -256,11 +256,7 @@ class ReconfigurationSpec
     )
   }
 
-  // Disabled: the workflow hangs after the UDF processes the EndChannel ECM 
in this
-  // multi-worker (Python source -> Python UDF) propagation case, causing a 
1-minute
-  // Await timeout. The single-op reconfigure path and the CSV-source variants 
pass.
-  // Re-enable once the source-propagation completion path is fixed.
-  "Engine" should "propagate reconfiguration through a source operator in 
workflow" ignore {
+  "Engine" should "propagate reconfiguration through a source operator in 
workflow" in {
     val sourceOpDesc = TestOperators.pythonSourceOpDesc(10000)
     val udfOpDesc = TestOperators.pythonOpDesc()
     val code = """

Reply via email to