This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 02018cdfc2 fix(amber): Python internal marker replay during
reconfiguration (#4547)
02018cdfc2 is described below
commit 02018cdfc25eccde569e34a69d45bce18e8a692d
Author: Yicong Huang <[email protected]>
AuthorDate: Tue Apr 28 03:50:29 2026 -0700
fix(amber): Python internal marker replay during reconfiguration (#4547)
### What changes were proposed in this PR?
This PR fixes the Python reconfiguration hang reported in #4545 and
explicitly re-enables the regression test that was temporarily ignored
in #4546.
The regression came from `#4424` (`ef66190f22`), which changed the
lifetime of Python `current_internal_marker`. After that change,
`get_internal_marker()` stopped consuming the marker on read, and the
main loop deferred cleanup until after replaying internal channel
markers.
For Python source operators, that allowed an internal `EndChannel`
marker to remain visible across the pause and reconfiguration window.
When the reconfiguration ECM was processed, the stale marker could be
observed and replayed again, which corrupted end-of-stream handling and
caused the workflow to hang.
This PR restores the expected one-time consumption behavior by:
- making `get_internal_marker()` consume `current_internal_marker` when
it is read
- removing the extra delayed cleanup after replaying internal channel
markers in the Python main loop
- changing `should propagate reconfiguration through a source operator
in workflow` in `ReconfigurationSpec` from `ignore` back to a normal
enabled test
### Any related issues, documentation, discussions?
Fixes #4545.
Regression introduced by #4424.
Re-enables the temporary test disable from #4546 after fixing the
underlying lifecycle bug.
### How was this PR tested?
Tested with existing Scala tests using Java 11:
- `WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.e2e.ReconfigurationSpec`
This run included the re-enabled `should propagate reconfiguration
through a source operator in workflow` case. The full
`ReconfigurationSpec` passed on the rebased branch (`5/5` passed, `0`
ignored).
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: OpenAI Codex (GPT-5)
---
.../python/core/architecture/managers/tuple_processing_manager.py | 3 ++-
amber/src/main/python/core/runnables/main_loop.py | 1 -
.../org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala | 6 +-----
3 files changed, 3 insertions(+), 7 deletions(-)
diff --git
a/amber/src/main/python/core/architecture/managers/tuple_processing_manager.py
b/amber/src/main/python/core/architecture/managers/tuple_processing_manager.py
index 3d4c9eee26..a67949e671 100644
---
a/amber/src/main/python/core/architecture/managers/tuple_processing_manager.py
+++
b/amber/src/main/python/core/architecture/managers/tuple_processing_manager.py
@@ -33,7 +33,8 @@ class TupleProcessingManager:
self.finished_current: Event = Event()
def get_internal_marker(self) -> Optional[InternalMarker]:
- return self.current_internal_marker
+ ret, self.current_internal_marker = self.current_internal_marker, None
+ return ret
def get_input_tuple(self) -> Optional[Tuple]:
ret, self.current_input_tuple = self.current_input_tuple, None
diff --git a/amber/src/main/python/core/runnables/main_loop.py
b/amber/src/main/python/core/runnables/main_loop.py
index 7b22e08606..9356542a08 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -342,7 +342,6 @@ class MainLoop(StoppableQueueBlockingRunnable):
StartChannel: self._process_start_channel,
EndChannel: self._process_end_channel,
}[type(self.context.tuple_processing_manager.current_internal_marker)]()
- self.context.tuple_processing_manager.current_internal_marker
= None
def _send_ecm_to_data_channels(
self, method_name: str, alignment: EmbeddedControlMessageType
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
index c13a5c13aa..92dfba19de 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
@@ -256,11 +256,7 @@ class ReconfigurationSpec
)
}
- // Disabled: the workflow hangs after the UDF processes the EndChannel ECM
in this
- // multi-worker (Python source -> Python UDF) propagation case, causing a
1-minute
- // Await timeout. The single-op reconfigure path and the CSV-source variants
pass.
- // Re-enable once the source-propagation completion path is fixed.
- "Engine" should "propagate reconfiguration through a source operator in
workflow" ignore {
+ "Engine" should "propagate reconfiguration through a source operator in
workflow" in {
val sourceOpDesc = TestOperators.pythonSourceOpDesc(10000)
val udfOpDesc = TestOperators.pythonOpDesc()
val code = """