This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-state-materialization
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-state-materialization
by this push:
new 4428c9d5b0 update
4428c9d5b0 is described below
commit 4428c9d5b0c9db35d6a018e9bb5571784c4ce735
Author: Xinyuan Lin <[email protected]>
AuthorDate: Wed May 6 03:08:40 2026 -0700
update
---
.../core/architecture/packaging/output_manager.py | 46 +++++++---
.../messaginglayer/OutputManager.scala | 16 ++--
.../architecture/packaging/test_output_manager.py | 100 +++++++++------------
3 files changed, 82 insertions(+), 80 deletions(-)
diff --git
a/amber/src/main/python/core/architecture/packaging/output_manager.py
b/amber/src/main/python/core/architecture/packaging/output_manager.py
index 5614a64a6b..1220a9f15f 100644
--- a/amber/src/main/python/core/architecture/packaging/output_manager.py
+++ b/amber/src/main/python/core/architecture/packaging/output_manager.py
@@ -87,7 +87,9 @@ class OutputManager:
PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread]
] = dict()
- self._state_writers: typing.Dict[PortIdentity, typing.Any] = dict()
+ self._port_state_writers: typing.Dict[
+ PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread]
+ ] = dict()
def is_missing_output_ports(self):
"""
@@ -150,9 +152,25 @@ class OutputManager:
state_document, _ = DocumentFactory.open_document(
State.uri_from_result_uri(storage_uri)
)
- state_writer =
state_document.writer(str(get_worker_index(self.worker_id)))
- state_writer.open()
- self._state_writers[port_id] = state_writer
+ state_buffered_item_writer = state_document.writer(
+ str(get_worker_index(self.worker_id))
+ )
+ state_writer_queue = Queue()
+ state_port_writer = PortStorageWriter(
+ buffered_item_writer=state_buffered_item_writer,
+ queue=state_writer_queue,
+ )
+ state_writer_thread = threading.Thread(
+ target=state_port_writer.run,
+ daemon=True,
+ name=f"port_state_writer_thread_{port_id}",
+ )
+ state_writer_thread.start()
+ self._port_state_writers[port_id] = (
+ state_writer_queue,
+ state_port_writer,
+ state_writer_thread,
+ )
def get_port(self, port_id=None) -> WorkerPort:
return list(self._ports.values())[0]
@@ -182,14 +200,12 @@ class OutputManager:
)
def save_state_to_storage_if_needed(self, state: State, port_id=None) ->
None:
+ element = PortStorageWriterElement(data_tuple=state.to_tuple())
if port_id is None:
- writers = self._state_writers.values()
- elif port_id in self._state_writers:
- writers = [self._state_writers[port_id]]
- else:
- return
- for writer in writers:
- writer.put_one(state.to_tuple())
+ for writer_queue, _, _ in self._port_state_writers.values():
+ writer_queue.put(element)
+ elif port_id in self._port_state_writers:
+ self._port_state_writers[port_id][0].put(element)
def close_port_storage_writers(self) -> None:
"""
@@ -204,9 +220,11 @@ class OutputManager:
for _, _, writer_thread in self._port_storage_writers.values():
# This blocking call will wait for all the writer to finish commit
writer_thread.join()
- for state_writer in self._state_writers.values():
- state_writer.close()
- self._state_writers.clear()
+ for _, state_writer, _ in self._port_state_writers.values():
+ state_writer.stop()
+ for _, _, state_writer_thread in self._port_state_writers.values():
+ state_writer_thread.join()
+ self._port_state_writers.clear()
def add_partitioning(self, tag: PhysicalLink, partitioning: Partitioning)
-> None:
"""
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
index a39ac1dcb4..9455b92556 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
@@ -124,7 +124,7 @@ class OutputManager(
: mutable.HashMap[PortIdentity, OutputPortResultWriterThread] =
mutable.HashMap()
- private val stateWriters: mutable.HashMap[PortIdentity,
BufferedItemWriter[Tuple]] =
+ private val stateWriterThreads: mutable.HashMap[PortIdentity,
OutputPortResultWriterThread] =
mutable.HashMap()
/**
@@ -236,8 +236,8 @@ class OutputManager(
})
}
- def saveStateToStorageIfNeeded(state: State): Unit = {
- stateWriters.values.foreach(_.putOne(state.toTuple))
+ private def saveStateToStorageIfNeeded(state: State): Unit = {
+ stateWriterThreads.values.foreach(_.queue.put(Left(state.toTuple)))
}
/**
@@ -253,7 +253,10 @@ class OutputManager(
writerThread.join()
case None =>
}
- this.stateWriters.remove(outputPortId).foreach(_.close())
+ this.stateWriterThreads.remove(outputPortId).foreach { writerThread =>
+ writerThread.queue.put(Right(PortStorageWriterTerminateSignal))
+ writerThread.join()
+ }
}
def getPort(portId: PortIdentity): WorkerPort = ports(portId)
@@ -304,8 +307,9 @@ class OutputManager(
._1
.writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
.asInstanceOf[BufferedItemWriter[Tuple]]
- stateWriter.open()
- this.stateWriters(portId) = stateWriter
+ val stateWriterThread = new OutputPortResultWriterThread(stateWriter)
+ this.stateWriterThreads(portId) = stateWriterThread
+ stateWriterThread.start()
}
}
diff --git
a/amber/src/test/python/core/architecture/packaging/test_output_manager.py
b/amber/src/test/python/core/architecture/packaging/test_output_manager.py
index 8f1daf8052..dcf7ccde67 100644
--- a/amber/src/test/python/core/architecture/packaging/test_output_manager.py
+++ b/amber/src/test/python/core/architecture/packaging/test_output_manager.py
@@ -15,15 +15,25 @@
# specific language governing permissions and limitations
# under the License.
-from unittest.mock import MagicMock, patch
+from unittest.mock import MagicMock
import pytest
from core.architecture.packaging.output_manager import OutputManager
from core.models.state import State
+from core.storage.runnables.port_storage_writer import PortStorageWriterElement
from proto.org.apache.texera.amber.core import PortIdentity
+def _stub_state_writer(output_manager, port_id):
+ """Inject a (queue, writer, thread) triple as if a port were set up."""
+ queue = MagicMock()
+ writer = MagicMock()
+ thread = MagicMock()
+ output_manager._port_state_writers[port_id] = (queue, writer, thread)
+ return queue, writer, thread
+
+
class TestSaveStateToStorageIfNeeded:
@pytest.fixture
def output_manager(self):
@@ -44,84 +54,54 @@ class TestSaveStateToStorageIfNeeded:
def test_no_state_writers_is_a_noop(self, output_manager, state):
# With no port set up, save_state_to_storage_if_needed must not
# touch any writer.
- output_manager.save_state_to_storage_if_needed(state) # no-op, no
exception
+ output_manager.save_state_to_storage_if_needed(state) # no-op
def test_unknown_port_id_is_a_noop(self, output_manager, state, port_a):
output_manager.save_state_to_storage_if_needed(state, port_id=port_a)
# No assertion needed -- the absence of any writer means nothing
# was attempted.
- def test_writes_to_every_port_when_port_id_omitted(
+ def test_enqueues_to_every_port_when_port_id_omitted(
self, output_manager, state, port_a, port_b
):
- writer_a = MagicMock()
- writer_b = MagicMock()
- output_manager._state_writers[port_a] = writer_a
- output_manager._state_writers[port_b] = writer_b
+ queue_a, _, _ = _stub_state_writer(output_manager, port_a)
+ queue_b, _, _ = _stub_state_writer(output_manager, port_b)
output_manager.save_state_to_storage_if_needed(state)
- writer_a.put_one.assert_called_once()
- writer_b.put_one.assert_called_once()
- # Long-lived writers must NOT be closed per state -- otherwise
- # we'd be back to one Iceberg snapshot per state.
- writer_a.close.assert_not_called()
- writer_b.close.assert_not_called()
+ # Each port's writer queue receives one PortStorageWriterElement.
+ # Critically, save is non-blocking -- the call must not invoke
+ # put_one / close on the buffered writer directly (those happen
+ # off-thread).
+ assert queue_a.put.call_count == 1
+ assert queue_b.put.call_count == 1
+ assert isinstance(queue_a.put.call_args.args[0],
PortStorageWriterElement)
+ assert isinstance(queue_b.put.call_args.args[0],
PortStorageWriterElement)
- def test_writes_only_to_selected_port_when_port_id_specified(
+ def test_enqueues_only_to_selected_port_when_port_id_specified(
self, output_manager, state, port_a, port_b
):
- writer_a = MagicMock()
- writer_b = MagicMock()
- output_manager._state_writers[port_a] = writer_a
- output_manager._state_writers[port_b] = writer_b
+ queue_a, _, _ = _stub_state_writer(output_manager, port_a)
+ queue_b, _, _ = _stub_state_writer(output_manager, port_b)
output_manager.save_state_to_storage_if_needed(state, port_id=port_a)
- writer_a.put_one.assert_called_once()
- writer_b.put_one.assert_not_called()
-
- def test_state_writer_is_opened_at_port_setup(self, output_manager,
port_a):
- # set_up_port_storage_writer should open the result document AND
- # the state document, then cache the state writer for reuse.
- result_doc = MagicMock()
- state_doc = MagicMock()
- state_writer = MagicMock()
- state_doc.writer.return_value = state_writer
-
- with patch(
- "core.architecture.packaging.output_manager.DocumentFactory"
- ) as mock_factory:
- mock_factory.open_document.side_effect = [
- (result_doc, MagicMock()),
- (state_doc, MagicMock()),
- ]
-
- output_manager.set_up_port_storage_writer(
- port_a, "vfs:///wf/0/exec/0/result/op-a"
- )
-
- opened = [c.args[0] for c in
mock_factory.open_document.call_args_list]
- assert opened == [
- "vfs:///wf/0/exec/0/result/op-a",
- "vfs:///wf/0/exec/0/state/op-a",
- ]
- state_writer.open.assert_called_once()
- assert output_manager._state_writers[port_a] is state_writer
-
- def test_close_port_storage_writers_flushes_state_writers(
+ assert queue_a.put.call_count == 1
+ queue_b.put.assert_not_called()
+
+ def test_close_port_storage_writers_stops_state_threads(
self, output_manager, port_a, port_b
):
- # After the port completes, the long-lived state writer's buffer
- # must be flushed and the writer closed (one Iceberg commit per
- # port instead of one per state).
- writer_a = MagicMock()
- writer_b = MagicMock()
- output_manager._state_writers[port_a] = writer_a
- output_manager._state_writers[port_b] = writer_b
+ # After the port completes, every state-writer thread must be
+ # stopped and joined so the buffered writer's close() (which
+ # flushes the final Iceberg commit) actually runs.
+ _, writer_a, thread_a = _stub_state_writer(output_manager, port_a)
+ _, writer_b, thread_b = _stub_state_writer(output_manager, port_b)
output_manager.close_port_storage_writers()
- writer_a.close.assert_called_once()
- writer_b.close.assert_called_once()
- assert output_manager._state_writers == {}
+ writer_a.stop.assert_called_once()
+ writer_b.stop.assert_called_once()
+ thread_a.join.assert_called_once()
+ thread_b.join.assert_called_once()
+ assert output_manager._port_state_writers == {}