aglinxinyuan commented on code in PR #4206:
URL: https://github.com/apache/texera/pull/4206#discussion_r3406801998
##########
amber/src/main/python/core/architecture/packaging/output_manager.py:
##########
@@ -203,20 +211,42 @@ def save_tuple_to_storage_if_needed(self, tuple_: Tuple,
port_id=None) -> None:
PortStorageWriterElement(data_tuple=tuple_)
)
- def save_state_to_storage_if_needed(self, state: State, port_id=None) ->
None:
+ def save_state_to_storage_if_needed(
+ self,
+ state: State,
+ loop_counter: int,
+ loop_start_id: str = "",
+ loop_start_state_uri: str = "",
+ port_id=None,
+ ) -> None:
# When port_id is omitted the same state row is fanned out to
# every output port's state table. This mirrors the
# broadcast-to-all-workers behavior on the emit side: state is
# shared context, not per-key data, so every downstream operator
# (and every worker reading the materialization) needs the full
# set.
- element = PortStorageWriterElement(data_tuple=state.to_tuple())
+ element = PortStorageWriterElement(
+ data_tuple=state.to_tuple(loop_counter, loop_start_id,
loop_start_state_uri)
+ )
if port_id is None:
for writer_queue, _, _ in self._port_state_writers.values():
writer_queue.put(element)
elif port_id in self._port_state_writers:
self._port_state_writers[port_id][0].put(element)
+ def reset_storage(self) -> None:
Review Comment:
You're right — that earlier work was lost when the branch was
rebased/squashed. Re-applied on the current branch in 2095b592ee.
* **Renamed** `reset_storage` → `reset_output_storage` (and updated the
caller in `main_loop.py` plus the two `__init__` / `set_up_port_storage_writer`
comments that already referenced the intended name).
* **Docstring** now states what it does (drop + recreate the result AND
state tables, then reopen the writers), that it's called only by a Loop End
worker once per iteration, and — the part that previously lived only in the PR
description — **why truncating live storage is safe**: a loop forces
MATERIALIZED execution mode, so downstream operators don't begin reading this
output until the loop region has fully completed; no reader can observe an
intermediate truncation.
* **Guards**: the two previously-implicit preconditions now raise a clear
`RuntimeError` instead of silently resetting the wrong port or dereferencing
`None` — (1) exactly one output port, (2) `set_up_port_storage_writer` already
ran (`_storage_uri_base` populated).
* **Tests**: new `TestResetOutputStorage` in `test_output_manager.py` covers
the happy path (close → recreate result+state docs → reopen writer) and both
guard failures, with the iceberg/thread collaborators mocked.
9/9 `test_output_manager.py` and 24/24 `test_main_loop.py` tests green.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]