This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-state-materialization
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 581d574ca694160a2b29fdbb62969e63a332bc09
Author: Xinyuan Lin <[email protected]>
AuthorDate: Mon May 4 23:00:03 2026 -0700

    feat(amber): materialize per-port state to Iceberg storage
    
    Adds a state-materialization path alongside the existing tuple-result
    storage. State produced by an operator's processState is written to a
    companion Iceberg table whose URI is derived from the result URI. The
    input-port materialization reader replays both tuples and states into
    downstream workers.
    
    Key pieces:
    
    - New STATE resource type and a state-namespace storage config entry
      on both Python and Scala sides; namespaces are read from
      StorageConfig instead of hardcoded strings.
    - RegionExecutionCoordinator provisions a state document next to every
      result document at scheduling time, so readers and writers can rely
      on its presence without try/catch.
    - One long-lived BufferedItemWriter per output port, opened at port
      setup and closed at port completion, so a single Iceberg snapshot is
      produced per port instead of one per state.
    - DataProcessor.processInputState (Scala) and MainLoop.process_input_state
      (Python) persist the executor's *output* state, matching the state
      that is also emitted downstream.
    - New Python and Scala unit tests covering the State JSON wire format,
      the OutputManager state-writer lifecycle, the reader's state-replay
      block, and DocumentFactory namespace routing.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../core/architecture/packaging/output_manager.py  |  30 +++-
 .../architecture/packaging/test_output_manager.py  | 127 ++++++++++++++
 amber/src/main/python/core/models/state.py         |   4 +
 amber/src/main/python/core/runnables/main_loop.py  |   1 +
 .../main/python/core/runnables/test_main_loop.py   |  76 +++++++++
 .../main/python/core/storage/document_factory.py   | 107 ++++++------
 .../core/storage/iceberg/test_iceberg_document.py  |  84 +++++++++
 .../input_port_materialization_reader_runnable.py  |  28 ++-
 ...t_input_port_materialization_reader_runnable.py | 190 +++++++++++++++++++++
 .../src/main/python/core/storage/storage_config.py |   3 +
 .../python/core/storage/test_document_factory.py   | 134 +++++++++++++++
 .../main/python/core/storage/vfs_uri_factory.py    |   1 +
 .../pytexera/storage/test_large_binary_manager.py  |   1 +
 amber/src/main/python/texera_run_python_worker.py  |   2 +
 .../messaginglayer/OutputManager.scala             |  19 ++-
 .../pythonworker/PythonWorkflowWorker.scala        |   1 +
 .../scheduling/RegionExecutionCoordinator.scala    |   3 +
 .../engine/architecture/worker/DataProcessor.scala |   1 +
 .../InputPortMaterializationReaderThread.scala     |  26 ++-
 common/config/src/main/resources/storage.conf      |   3 +
 .../amber/config/EnvironmentalVariable.scala       |   1 +
 .../apache/texera/amber/config/StorageConfig.scala |   3 +
 .../org/apache/texera/amber/core/state/State.scala |   4 +
 .../amber/core/storage/DocumentFactory.scala       |   2 +
 .../texera/amber/core/storage/VFSURIFactory.scala  |   1 +
 .../result/iceberg/IcebergDocumentSpec.scala       |  79 +++++++++
 26 files changed, 877 insertions(+), 54 deletions(-)

diff --git 
a/amber/src/main/python/core/architecture/packaging/output_manager.py 
b/amber/src/main/python/core/architecture/packaging/output_manager.py
index bf4afbf396..08fa210eca 100644
--- a/amber/src/main/python/core/architecture/packaging/output_manager.py
+++ b/amber/src/main/python/core/architecture/packaging/output_manager.py
@@ -87,6 +87,8 @@ class OutputManager:
             PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread]
         ] = dict()
 
+        self._state_writers: typing.Dict[PortIdentity, typing.Any] = dict()
+
     def is_missing_output_ports(self):
         """
         This method is only used for ensuring correct region execution.
@@ -124,7 +126,8 @@ class OutputManager:
     def set_up_port_storage_writer(self, port_id: PortIdentity, storage_uri: 
str):
         """
         Create a separate thread for saving output tuples of a port
-        to storage in batch.
+        to storage in batch, and open a long-lived buffered writer for
+        state materialization on the same port.
         """
         document, _ = DocumentFactory.open_document(storage_uri)
         buffered_item_writer = 
document.writer(str(get_worker_index(self.worker_id)))
@@ -144,6 +147,13 @@ class OutputManager:
             writer_thread,
         )
 
+        state_document, _ = DocumentFactory.open_document(
+            State.uri_from_result_uri(storage_uri)
+        )
+        state_writer = 
state_document.writer(str(get_worker_index(self.worker_id)))
+        state_writer.open()
+        self._state_writers[port_id] = state_writer
+
     def get_port(self, port_id=None) -> WorkerPort:
         return list(self._ports.values())[0]
 
@@ -171,6 +181,19 @@ class OutputManager:
                 PortStorageWriterElement(data_tuple=tuple_)
             )
 
+    def save_state_to_storage_if_needed(self, state: State, port_id=None) -> 
None:
+        # Buffer the state on each long-lived writer; the writer flushes
+        # itself when its buffer fills, and the remaining buffer is
+        # flushed in close_port_storage_writers.
+        if port_id is None:
+            writers = self._state_writers.values()
+        elif port_id in self._state_writers:
+            writers = [self._state_writers[port_id]]
+        else:
+            return
+        for writer in writers:
+            writer.put_one(state.to_tuple())
+
     def close_port_storage_writers(self) -> None:
         """
         Flush the buffers of port storage writers and wait for all the
@@ -184,6 +207,11 @@ class OutputManager:
         for _, _, writer_thread in self._port_storage_writers.values():
             # This blocking call will wait for all the writer to finish commit
             writer_thread.join()
+        # Close the long-lived state writers so the remaining buffered
+        # states are committed in a single Iceberg snapshot per port.
+        for state_writer in self._state_writers.values():
+            state_writer.close()
+        self._state_writers.clear()
 
     def add_partitioning(self, tag: PhysicalLink, partitioning: Partitioning) 
-> None:
         """
diff --git 
a/amber/src/main/python/core/architecture/packaging/test_output_manager.py 
b/amber/src/main/python/core/architecture/packaging/test_output_manager.py
new file mode 100644
index 0000000000..8f1daf8052
--- /dev/null
+++ b/amber/src/main/python/core/architecture/packaging/test_output_manager.py
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from core.architecture.packaging.output_manager import OutputManager
+from core.models.state import State
+from proto.org.apache.texera.amber.core import PortIdentity
+
+
+class TestSaveStateToStorageIfNeeded:
+    @pytest.fixture
+    def output_manager(self):
+        return OutputManager(worker_id="Worker:WF0-test-main-0")
+
+    @pytest.fixture
+    def port_a(self):
+        return PortIdentity(id=0, internal=False)
+
+    @pytest.fixture
+    def port_b(self):
+        return PortIdentity(id=1, internal=False)
+
+    @pytest.fixture
+    def state(self):
+        return State({"loop_counter": 1, "i": 2})
+
+    def test_no_state_writers_is_a_noop(self, output_manager, state):
+        # With no port set up, save_state_to_storage_if_needed must not
+        # touch any writer.
+        output_manager.save_state_to_storage_if_needed(state)  # no-op, no 
exception
+
+    def test_unknown_port_id_is_a_noop(self, output_manager, state, port_a):
+        output_manager.save_state_to_storage_if_needed(state, port_id=port_a)
+        # No assertion needed -- the absence of any writer means nothing
+        # was attempted.
+
+    def test_writes_to_every_port_when_port_id_omitted(
+        self, output_manager, state, port_a, port_b
+    ):
+        writer_a = MagicMock()
+        writer_b = MagicMock()
+        output_manager._state_writers[port_a] = writer_a
+        output_manager._state_writers[port_b] = writer_b
+
+        output_manager.save_state_to_storage_if_needed(state)
+
+        writer_a.put_one.assert_called_once()
+        writer_b.put_one.assert_called_once()
+        # Long-lived writers must NOT be closed per state -- otherwise
+        # we'd be back to one Iceberg snapshot per state.
+        writer_a.close.assert_not_called()
+        writer_b.close.assert_not_called()
+
+    def test_writes_only_to_selected_port_when_port_id_specified(
+        self, output_manager, state, port_a, port_b
+    ):
+        writer_a = MagicMock()
+        writer_b = MagicMock()
+        output_manager._state_writers[port_a] = writer_a
+        output_manager._state_writers[port_b] = writer_b
+
+        output_manager.save_state_to_storage_if_needed(state, port_id=port_a)
+
+        writer_a.put_one.assert_called_once()
+        writer_b.put_one.assert_not_called()
+
+    def test_state_writer_is_opened_at_port_setup(self, output_manager, 
port_a):
+        # set_up_port_storage_writer should open the result document AND
+        # the state document, then cache the state writer for reuse.
+        result_doc = MagicMock()
+        state_doc = MagicMock()
+        state_writer = MagicMock()
+        state_doc.writer.return_value = state_writer
+
+        with patch(
+            "core.architecture.packaging.output_manager.DocumentFactory"
+        ) as mock_factory:
+            mock_factory.open_document.side_effect = [
+                (result_doc, MagicMock()),
+                (state_doc, MagicMock()),
+            ]
+
+            output_manager.set_up_port_storage_writer(
+                port_a, "vfs:///wf/0/exec/0/result/op-a"
+            )
+
+            opened = [c.args[0] for c in 
mock_factory.open_document.call_args_list]
+            assert opened == [
+                "vfs:///wf/0/exec/0/result/op-a",
+                "vfs:///wf/0/exec/0/state/op-a",
+            ]
+            state_writer.open.assert_called_once()
+            assert output_manager._state_writers[port_a] is state_writer
+
+    def test_close_port_storage_writers_flushes_state_writers(
+        self, output_manager, port_a, port_b
+    ):
+        # After the port completes, the long-lived state writer's buffer
+        # must be flushed and the writer closed (one Iceberg commit per
+        # port instead of one per state).
+        writer_a = MagicMock()
+        writer_b = MagicMock()
+        output_manager._state_writers[port_a] = writer_a
+        output_manager._state_writers[port_b] = writer_b
+
+        output_manager.close_port_storage_writers()
+
+        writer_a.close.assert_called_once()
+        writer_b.close.assert_called_once()
+        assert output_manager._state_writers == {}
diff --git a/amber/src/main/python/core/models/state.py 
b/amber/src/main/python/core/models/state.py
index 003aaa212a..3ce610bbee 100644
--- a/amber/src/main/python/core/models/state.py
+++ b/amber/src/main/python/core/models/state.py
@@ -41,6 +41,10 @@ class State(dict):
     def from_tuple(cls, row: Tuple) -> "State":
         return cls.from_json(row[cls.CONTENT])
 
+    @staticmethod
+    def uri_from_result_uri(result_uri: str) -> str:
+        return result_uri.replace("/result", "/state")
+
 
 _TYPE_MARKER = "__texera_type__"
 _PAYLOAD_MARKER = "payload"
diff --git a/amber/src/main/python/core/runnables/main_loop.py 
b/amber/src/main/python/core/runnables/main_loop.py
index ab35cda81b..1334af12bf 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -202,6 +202,7 @@ class MainLoop(StoppableQueueBlockingRunnable):
                         payload=batch,
                     )
                 )
+            
self.context.output_manager.save_state_to_storage_if_needed(output_state)
 
     def process_tuple_with_udf(self) -> Iterator[Optional[Tuple]]:
         """
diff --git a/amber/src/main/python/core/runnables/test_main_loop.py 
b/amber/src/main/python/core/runnables/test_main_loop.py
index c9daa633f5..534493f0c2 100644
--- a/amber/src/main/python/core/runnables/test_main_loop.py
+++ b/amber/src/main/python/core/runnables/test_main_loop.py
@@ -1388,6 +1388,82 @@ class TestMainLoop:
         assert second_output.payload.frame["value"] == 42
         assert second_output.payload.frame["port"] == 0
 
+    @pytest.mark.timeout(2)
+    def test_process_input_state_persists_output_state_to_storage(
+        self,
+        main_loop,
+        mock_data_output_channel,
+        monkeypatch,
+    ):
+        # process_input_state must invoke save_state_to_storage_if_needed
+        # with the freshly emitted output state, so every state that flows
+        # downstream is also durable on the upstream output port.
+        class DummyExecutor:
+            @staticmethod
+            def process_state(state: State, port: int) -> State:
+                return State({"value": state["value"] + 1, "port": port})
+
+        saved_states: list[State] = []
+        main_loop.context.executor_manager.executor = DummyExecutor()
+        monkeypatch.setattr(main_loop, "_check_and_process_control", lambda: 
None)
+        monkeypatch.setattr(
+            main_loop.context.output_manager,
+            "emit_state",
+            lambda state: [(mock_data_output_channel.to_worker_id, 
StateFrame(state))],
+        )
+        monkeypatch.setattr(
+            main_loop.context.output_manager,
+            "save_state_to_storage_if_needed",
+            lambda state: saved_states.append(state),
+        )
+
+        def fake_switch_context():
+            current_input_state = (
+                main_loop.context.state_processing_manager.current_input_state
+            )
+            if current_input_state is not None:
+                
main_loop.context.state_processing_manager.current_output_state = (
+                    DummyExecutor.process_state(current_input_state, 0)
+                )
+
+        monkeypatch.setattr(main_loop, "_switch_context", fake_switch_context)
+
+        main_loop._process_state(State({"value": 1}))
+        main_loop._process_state(State({"value": 41}))
+
+        # Each input state produced one output state, so both must have
+        # been persisted in order.
+        assert [s["value"] for s in saved_states] == [2, 42]
+        assert all(s["port"] == 0 for s in saved_states)
+
+    @pytest.mark.timeout(2)
+    def test_process_input_state_does_not_save_when_no_output(
+        self,
+        main_loop,
+        monkeypatch,
+    ):
+        # When the executor returns no output state (process_state returned
+        # None), save_state_to_storage_if_needed must not be called -- no
+        # state means nothing to materialize.
+        save_calls: list[State] = []
+        monkeypatch.setattr(main_loop, "_check_and_process_control", lambda: 
None)
+        monkeypatch.setattr(
+            main_loop.context.output_manager,
+            "emit_state",
+            lambda state: [],
+        )
+        monkeypatch.setattr(
+            main_loop.context.output_manager,
+            "save_state_to_storage_if_needed",
+            lambda state: save_calls.append(state),
+        )
+        # Pretend DataProc consumed the input but produced no output.
+        monkeypatch.setattr(main_loop, "_switch_context", lambda: None)
+
+        main_loop._process_state(State({"value": 1}))
+
+        assert save_calls == []
+
     @pytest.mark.timeout(2)
     def test_main_loop_thread_can_process_state(
         self,
diff --git a/amber/src/main/python/core/storage/document_factory.py 
b/amber/src/main/python/core/storage/document_factory.py
index 9b686ab66b..bd690ceb59 100644
--- a/amber/src/main/python/core/storage/document_factory.py
+++ b/amber/src/main/python/core/storage/document_factory.py
@@ -61,30 +61,35 @@ class DocumentFactory:
         if parsed_uri.scheme == VFSURIFactory.VFS_FILE_URI_SCHEME:
             _, _, _, resource_type = VFSURIFactory.decode_uri(uri)
 
-            if resource_type in {VFSResourceType.RESULT}:
-                storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
-
-                # Convert Amber Schema to Iceberg Schema with LARGE_BINARY
-                # field name encoding
-                iceberg_schema = amber_schema_to_iceberg_schema(schema)
-
-                create_table(
-                    IcebergCatalogInstance.get_instance(),
-                    StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
-                    storage_key,
-                    iceberg_schema,
-                    override_if_exists=True,
-                )
-
-                return IcebergDocument[Tuple](
-                    StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
-                    storage_key,
-                    iceberg_schema,
-                    amber_tuples_to_arrow_table,
-                    arrow_table_to_amber_tuples,
-                )
-            else:
-                raise ValueError(f"Resource type {resource_type} is not 
supported")
+            match resource_type:
+                case VFSResourceType.RESULT:
+                    namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
+                case VFSResourceType.STATE:
+                    namespace = StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE
+                case _:
+                    raise ValueError(f"Resource type {resource_type} is not 
supported")
+
+            storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
+            # Convert Amber Schema to Iceberg Schema with LARGE_BINARY
+            # field name encoding
+            iceberg_schema = amber_schema_to_iceberg_schema(schema)
+
+            create_table(
+                IcebergCatalogInstance.get_instance(),
+                namespace,
+                storage_key,
+                iceberg_schema,
+                override_if_exists=True,
+            )
+
+            return IcebergDocument[Tuple](
+                namespace,
+                storage_key,
+                iceberg_schema,
+                amber_tuples_to_arrow_table,
+                arrow_table_to_amber_tuples,
+            )
+
         else:
             raise NotImplementedError(
                 f"Unsupported URI scheme: {parsed_uri.scheme} for creating the 
document"
@@ -96,30 +101,36 @@ class DocumentFactory:
         if parsed_uri.scheme == "vfs":
             _, _, _, resource_type = VFSURIFactory.decode_uri(uri)
 
-            if resource_type in {VFSResourceType.RESULT}:
-                storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
-
-                table = load_table_metadata(
-                    IcebergCatalogInstance.get_instance(),
-                    StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
-                    storage_key,
-                )
-
-                if table is None:
-                    raise ValueError("No storage is found for the given URI")
-
-                amber_schema = Schema(table.schema().as_arrow())
-
-                document = IcebergDocument(
-                    StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
-                    storage_key,
-                    table.schema(),
-                    amber_tuples_to_arrow_table,
-                    arrow_table_to_amber_tuples,
-                )
-                return document, amber_schema
-            else:
-                raise ValueError(f"Resource type {resource_type} is not 
supported")
+            match resource_type:
+                case VFSResourceType.RESULT:
+                    namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
+                case VFSResourceType.STATE:
+                    namespace = StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE
+                case _:
+                    raise ValueError(f"Resource type {resource_type} is not 
supported")
+
+            storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
+
+            table = load_table_metadata(
+                IcebergCatalogInstance.get_instance(),
+                namespace,
+                storage_key,
+            )
+
+            if table is None:
+                raise ValueError("No storage is found for the given URI")
+
+            amber_schema = Schema(table.schema().as_arrow())
+
+            document = IcebergDocument(
+                namespace,
+                storage_key,
+                table.schema(),
+                amber_tuples_to_arrow_table,
+                arrow_table_to_amber_tuples,
+            )
+            return document, amber_schema
+
         else:
             raise NotImplementedError(
                 f"Unsupported URI scheme: {parsed_uri.scheme} for opening the 
document"
diff --git 
a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py 
b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
index 9b374f7d5c..032376ae31 100644
--- a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
+++ b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
@@ -23,6 +23,7 @@ from concurrent.futures import as_completed
 from concurrent.futures.thread import ThreadPoolExecutor
 
 from core.models import Schema, Tuple
+from core.models.state import State
 from core.storage.document_factory import DocumentFactory
 from core.storage.storage_config import StorageConfig
 from core.storage.vfs_uri_factory import VFSURIFactory
@@ -44,6 +45,7 @@ StorageConfig.initialize(
     rest_catalog_uri="http://localhost:8181/catalog/";,
     rest_catalog_warehouse_name="texera",
     table_result_namespace="operator-port-result",
+    table_state_namespace="operator-port-state",
     directory_path="../../../../../../amber/user-resources/workflow-results",
     commit_batch_size=4096,
     s3_endpoint="http://localhost:9000";,
@@ -317,3 +319,85 @@ class TestIcebergDocument:
         assert iceberg_document.get_count() == len(sample_items), (
             "get_count should return the same number as the length of 
sample_items"
         )
+
+    def test_state_materialization_round_trip(self):
+        operator_uuid = str(uuid.uuid4()).replace("-", "")
+        result_uri = VFSURIFactory.create_result_uri(
+            WorkflowIdentity(id=0),
+            ExecutionIdentity(id=0),
+            GlobalPortIdentity(
+                op_id=PhysicalOpIdentity(
+                    
logical_op_id=OperatorIdentity(id=f"test_state_{operator_uuid}"),
+                    layer_name="main",
+                ),
+                port_id=PortIdentity(id=0),
+                input=False,
+            ),
+        )
+        state_uri = State.uri_from_result_uri(result_uri)
+        DocumentFactory.create_document(state_uri, State.SCHEMA)
+        document, _ = DocumentFactory.open_document(state_uri)
+
+        state = State(
+            {
+                "loop_counter": 3,
+                "name": "outer-loop",
+                "payload": b"\x00\x01state-bytes",
+                "nested": {"enabled": True, "values": [1, 2, 3]},
+            }
+        )
+
+        writer = document.writer(str(uuid.uuid4()))
+        writer.open()
+        writer.put_one(state.to_tuple())
+        writer.close()
+
+        stored_rows = list(document.get())
+        assert len(stored_rows) == 1
+        assert State.from_tuple(stored_rows[0]) == state
+
+    def test_multiple_states_materialize_as_rows_in_one_table(self):
+        operator_uuid = str(uuid.uuid4()).replace("-", "")
+        result_uri = VFSURIFactory.create_result_uri(
+            WorkflowIdentity(id=0),
+            ExecutionIdentity(id=0),
+            GlobalPortIdentity(
+                op_id=PhysicalOpIdentity(
+                    logical_op_id=OperatorIdentity(
+                        id=f"test_multiple_states_{operator_uuid}"
+                    ),
+                    layer_name="main",
+                ),
+                port_id=PortIdentity(id=0),
+                input=False,
+            ),
+        )
+        state_uri = State.uri_from_result_uri(result_uri)
+        DocumentFactory.create_document(state_uri, State.SCHEMA)
+        document, _ = DocumentFactory.open_document(state_uri)
+
+        states = [
+            State({"loop_counter": 0, "i": 1, "payload": b"first"}),
+            State(
+                {
+                    "loop_counter": 1,
+                    "i": 2,
+                    "payload": b"second",
+                    "nested": {"values": [3, 4]},
+                }
+            ),
+        ]
+
+        writer = document.writer(str(uuid.uuid4()))
+        writer.open()
+        for state in states:
+            writer.put_one(state.to_tuple())
+        writer.close()
+
+        stored_rows = list(document.get())
+        assert len(stored_rows) == len(states)
+        actual_states = sorted(
+            [State.from_tuple(row) for row in stored_rows],
+            key=lambda state: state["loop_counter"],
+        )
+        assert actual_states == states
diff --git 
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
 
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
index e49c0316cc..bc2f069157 100644
--- 
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
+++ 
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
@@ -17,8 +17,8 @@
 
 import typing
 from loguru import logger
-from pyarrow import Table
 from typing import Union
+from pyarrow import Table
 
 from core.architecture.sendsemantics.broad_cast_partitioner import (
     BroadcastPartitioner,
@@ -34,7 +34,7 @@ from 
core.architecture.sendsemantics.range_based_shuffle_partitioner import (
 from core.architecture.sendsemantics.round_robin_partitioner import (
     RoundRobinPartitioner,
 )
-from core.models import Tuple, InternalQueue, DataFrame, DataPayload
+from core.models import Tuple, InternalQueue, DataFrame, DataPayload, State, 
StateFrame
 from core.models.internal_queue import DataElement, ECMElement
 from core.storage.document_factory import DocumentFactory
 from core.util import Stoppable, get_one_of
@@ -125,6 +125,15 @@ class InputPortMaterializationReaderRunnable(Runnable, 
Stoppable):
             if receiver == self.worker_actor_id:
                 yield self.tuples_to_data_frame(tuples)
 
+    def emit_state_with_filter(self, state: State) -> 
typing.Iterator[DataPayload]:
+        for receiver, payload in self.partitioner.flush_state(state):
+            if receiver == self.worker_actor_id:
+                yield (
+                    StateFrame(payload)
+                    if isinstance(payload, State)
+                    else self.tuples_to_data_frame(payload)
+                )
+
     def run(self) -> None:
         """
         Main execution logic that reads tuples from the materialized storage 
and
@@ -138,8 +147,21 @@ class InputPortMaterializationReaderRunnable(Runnable, 
Stoppable):
                 self.uri
             )
             self.emit_ecm("StartChannel", 
EmbeddedControlMessageType.NO_ALIGNMENT)
-            storage_iterator = self.materialization.get()
 
+            try:
+                state_document, _ = DocumentFactory.open_document(
+                    State.uri_from_result_uri(self.uri)
+                )
+                state_iterator = state_document.get()
+                for state in state_iterator:
+                    for state_frame in self.emit_state_with_filter(
+                        State.from_tuple(state)
+                    ):
+                        self.emit_payload(state_frame)
+            except ValueError:
+                pass
+
+            storage_iterator = self.materialization.get()
             # Iterate and process tuples.
             for tup in storage_iterator:
                 if self._stopped:
diff --git 
a/amber/src/main/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py
 
b/amber/src/main/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py
new file mode 100644
index 0000000000..3662d023f5
--- /dev/null
+++ 
b/amber/src/main/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py
@@ -0,0 +1,190 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from core.models import State, StateFrame, Tuple
+from core.models.internal_queue import DataElement
+from core.models.schema import Schema
+from core.storage.runnables.input_port_materialization_reader_runnable import (
+    InputPortMaterializationReaderRunnable,
+)
+from proto.org.apache.texera.amber.core import (
+    ActorVirtualIdentity,
+    ChannelIdentity,
+)
+
+
+class TestEmitStateWithFilter:
+    """Cover the partitioner-filter logic for state payloads in
+    InputPortMaterializationReaderRunnable. These tests bypass __init__
+    so we don't need a real partitioner or storage URI.
+    """
+
+    @pytest.fixture
+    def me(self):
+        return ActorVirtualIdentity(name="me")
+
+    @pytest.fixture
+    def someone_else(self):
+        return ActorVirtualIdentity(name="other")
+
+    @pytest.fixture
+    def runnable(self, me):
+        # __new__ skips __init__ so we can wire only the fields we need.
+        instance = InputPortMaterializationReaderRunnable.__new__(
+            InputPortMaterializationReaderRunnable
+        )
+        instance.worker_actor_id = me
+        instance.partitioner = MagicMock()
+        instance.tuple_schema = Schema(raw_schema={"x": "INTEGER"})
+        return instance
+
+    def test_yields_state_frame_for_matching_receiver(self, runnable, me):
+        state = State({"k": 1})
+        runnable.partitioner.flush_state.return_value = [(me, state)]
+
+        frames = list(runnable.emit_state_with_filter(state))
+
+        assert len(frames) == 1
+        assert isinstance(frames[0], StateFrame)
+        assert frames[0].frame is state
+
+    def test_filters_out_non_matching_receivers(self, runnable, me, 
someone_else):
+        state = State({"k": 1})
+        runnable.partitioner.flush_state.return_value = [
+            (someone_else, state),
+            (me, state),
+            (someone_else, state),
+        ]
+
+        frames = list(runnable.emit_state_with_filter(state))
+
+        assert len(frames) == 1
+        assert isinstance(frames[0], StateFrame)
+
+    def test_yields_data_frame_for_non_state_payload(self, runnable, me):
+        # When the partitioner produces a tuple-batch payload 
(BroadcastPartitioner
+        # case), the runnable must convert it to a DataFrame instead of 
wrapping
+        # it as a StateFrame.
+        state = State({"k": 1})
+        tuples = [Tuple({"x": 7}, schema=runnable.tuple_schema)]
+        runnable.partitioner.flush_state.return_value = [(me, tuples)]
+
+        frames = list(runnable.emit_state_with_filter(state))
+
+        assert len(frames) == 1
+        # Should not be wrapped as a StateFrame.
+        assert not isinstance(frames[0], StateFrame)
+        assert frames[0].frame.num_rows == 1
+
+    def test_empty_partitioner_output_yields_nothing(self, runnable):
+        state = State({})
+        runnable.partitioner.flush_state.return_value = []
+
+        assert list(runnable.emit_state_with_filter(state)) == []
+
+
+class TestRunStateReadingBlock:
+    """Cover the inner try-block in run() that opens the state document and
+    emits its rows as StateFrames.
+    """
+
+    @pytest.fixture
+    def me(self):
+        return ActorVirtualIdentity(name="me")
+
+    @pytest.fixture
+    def runnable(self, me):
+        instance = InputPortMaterializationReaderRunnable.__new__(
+            InputPortMaterializationReaderRunnable
+        )
+        instance.uri = "vfs:///wf/0/exec/0/result/op-a"
+        instance.worker_actor_id = me
+        instance.tuple_schema = Schema(raw_schema={"x": "INTEGER"})
+        instance._stopped = False
+        instance._finished = False
+        instance.channel_id = ChannelIdentity(me, me, is_control=False)
+        instance.queue = MagicMock()
+        instance.partitioner = MagicMock()
+        # No tuple-batches and no ECM-flush payloads in these tests.
+        instance.partitioner.flush.return_value = []
+        return instance
+
+    def test_state_rows_are_emitted_as_state_frames(self, runnable, me):
+        state_a = State({"loop_counter": 0})
+        state_b = State({"loop_counter": 1})
+
+        # The state document yields opaque tuples; from_tuple deserializes
+        # them. Patch from_tuple so we don't have to wire a real serialization.
+        result_doc = MagicMock()
+        result_doc.get.return_value = iter([])  # No materialized tuples.
+        state_doc = MagicMock()
+        state_doc.get.return_value = iter(["row-a", "row-b"])
+
+        with (
+            patch(
+                
"core.storage.runnables.input_port_materialization_reader_runnable.DocumentFactory"
+            ) as mock_factory,
+            patch.object(State, "from_tuple") as mock_from_tuple,
+        ):
+            mock_factory.open_document.side_effect = [
+                (result_doc, runnable.tuple_schema),
+                (state_doc, None),
+            ]
+            mock_from_tuple.side_effect = [state_a, state_b]
+            runnable.partitioner.flush_state.side_effect = [
+                [(me, state_a)],
+                [(me, state_b)],
+            ]
+
+            runnable.run()
+
+        # Two StateFrames must have been put on the queue, in order.
+        state_frames = [
+            call.args[0]
+            for call in runnable.queue.put.call_args_list
+            if isinstance(call.args[0], DataElement)
+            and isinstance(call.args[0].payload, StateFrame)
+        ]
+        assert [sf.payload.frame for sf in state_frames] == [state_a, state_b]
+        assert runnable._finished is True
+
+    def test_missing_state_document_does_not_abort_run(self, runnable):
+        # The inner try is meant to swallow ValueError when no state document
+        # is provisioned; the outer run() should still finish cleanly.
+        result_doc = MagicMock()
+        result_doc.get.return_value = iter([])
+
+        with patch(
+            
"core.storage.runnables.input_port_materialization_reader_runnable.DocumentFactory"
+        ) as mock_factory:
+            mock_factory.open_document.side_effect = [
+                (result_doc, runnable.tuple_schema),
+                ValueError("no storage"),
+            ]
+
+            runnable.run()
+
+        assert runnable._finished is True
+        # No StateFrames should have been emitted.
+        for call in runnable.queue.put.call_args_list:
+            element = call.args[0]
+            if isinstance(element, DataElement):
+                assert not isinstance(element.payload, StateFrame)
diff --git a/amber/src/main/python/core/storage/storage_config.py 
b/amber/src/main/python/core/storage/storage_config.py
index 0e47bdb71a..8233590987 100644
--- a/amber/src/main/python/core/storage/storage_config.py
+++ b/amber/src/main/python/core/storage/storage_config.py
@@ -32,6 +32,7 @@ class StorageConfig:
     ICEBERG_REST_CATALOG_URI = None
     ICEBERG_REST_CATALOG_WAREHOUSE_NAME = None
     ICEBERG_TABLE_RESULT_NAMESPACE = None
+    ICEBERG_TABLE_STATE_NAMESPACE = None
     ICEBERG_FILE_STORAGE_DIRECTORY_PATH = None
     ICEBERG_TABLE_COMMIT_BATCH_SIZE = None
 
@@ -51,6 +52,7 @@ class StorageConfig:
         rest_catalog_uri,
         rest_catalog_warehouse_name,
         table_result_namespace,
+        table_state_namespace,
         directory_path,
         commit_batch_size,
         s3_endpoint,
@@ -71,6 +73,7 @@ class StorageConfig:
         cls.ICEBERG_REST_CATALOG_WAREHOUSE_NAME = rest_catalog_warehouse_name
 
         cls.ICEBERG_TABLE_RESULT_NAMESPACE = table_result_namespace
+        cls.ICEBERG_TABLE_STATE_NAMESPACE = table_state_namespace
         cls.ICEBERG_FILE_STORAGE_DIRECTORY_PATH = directory_path
         cls.ICEBERG_TABLE_COMMIT_BATCH_SIZE = int(commit_batch_size)
 
diff --git a/amber/src/main/python/core/storage/test_document_factory.py 
b/amber/src/main/python/core/storage/test_document_factory.py
new file mode 100644
index 0000000000..859c004024
--- /dev/null
+++ b/amber/src/main/python/core/storage/test_document_factory.py
@@ -0,0 +1,134 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from core.models import Schema
+from core.storage.document_factory import DocumentFactory
+from core.storage.storage_config import StorageConfig
+from core.storage.vfs_uri_factory import VFSResourceType
+
+
+# Avoid initializing the real config (only initializable once per process).
+StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE = "test-result-ns"
+StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE = "test-state-ns"
+
+VFS_URI = "vfs:///wid/0/eid/0/opid/test/main/0/0/result"
+
+
[email protected]
+def schema():
+    return Schema(raw_schema={"x": "INTEGER"})
+
+
+def _decode_returning(resource_type):
+    """Helper: build a VFSURIFactory.decode_uri side_effect."""
+    return lambda _uri: (None, None, None, resource_type)
+
+
+@patch("core.storage.document_factory.IcebergDocument")
+@patch("core.storage.document_factory.amber_schema_to_iceberg_schema")
+@patch("core.storage.document_factory.create_table")
+@patch("core.storage.document_factory.IcebergCatalogInstance")
+@patch("core.storage.document_factory.VFSURIFactory")
+class TestCreateDocumentNamespaceRouting:
+    def test_state_resource_type_uses_state_namespace(
+        self, mock_vfs, _icb, mock_create_table, _amber_schema, _doc, schema
+    ):
+        mock_vfs.VFS_FILE_URI_SCHEME = "vfs"
+        mock_vfs.decode_uri.side_effect = 
_decode_returning(VFSResourceType.STATE)
+
+        DocumentFactory.create_document(VFS_URI, schema)
+
+        args, _ = mock_create_table.call_args
+        assert args[1] == StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE
+
+    def test_result_resource_type_uses_result_namespace(
+        self, mock_vfs, _icb, mock_create_table, _amber_schema, _doc, schema
+    ):
+        mock_vfs.VFS_FILE_URI_SCHEME = "vfs"
+        mock_vfs.decode_uri.side_effect = 
_decode_returning(VFSResourceType.RESULT)
+
+        DocumentFactory.create_document(VFS_URI, schema)
+
+        args, _ = mock_create_table.call_args
+        assert args[1] == StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
+
+    def test_unsupported_resource_type_raises_value_error(
+        self, mock_vfs, _icb, _create_table, _amber_schema, _doc, schema
+    ):
+        mock_vfs.VFS_FILE_URI_SCHEME = "vfs"
+        # CONSOLE_MESSAGES has no namespace mapping in the Python factory.
+        mock_vfs.decode_uri.side_effect = _decode_returning(
+            VFSResourceType.CONSOLE_MESSAGES
+        )
+
+        with pytest.raises(ValueError, match="not supported"):
+            DocumentFactory.create_document(VFS_URI, schema)
+
+
+def test_create_document_rejects_non_vfs_scheme(schema):
+    with pytest.raises(NotImplementedError, match="Unsupported URI scheme"):
+        DocumentFactory.create_document("file:///tmp/x", schema)
+
+
+@patch("core.storage.document_factory.IcebergDocument")
+@patch("core.storage.document_factory.Schema")
+@patch("core.storage.document_factory.load_table_metadata")
+@patch("core.storage.document_factory.IcebergCatalogInstance")
+@patch("core.storage.document_factory.VFSURIFactory")
+class TestOpenDocumentNamespaceRouting:
+    @staticmethod
+    def _stub_table():
+        table = MagicMock()
+        table.schema.return_value.as_arrow.return_value = MagicMock()
+        return table
+
+    def test_state_resource_type_uses_state_namespace(
+        self, mock_vfs, _icb, mock_load, _schema_cls, _doc
+    ):
+        mock_vfs.VFS_FILE_URI_SCHEME = "vfs"
+        mock_vfs.decode_uri.side_effect = 
_decode_returning(VFSResourceType.STATE)
+        mock_load.return_value = self._stub_table()
+
+        DocumentFactory.open_document(VFS_URI)
+
+        args, _ = mock_load.call_args
+        assert args[1] == StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE
+
+    def test_unsupported_resource_type_raises_value_error(
+        self, mock_vfs, _icb, _load, _schema_cls, _doc
+    ):
+        mock_vfs.VFS_FILE_URI_SCHEME = "vfs"
+        mock_vfs.decode_uri.side_effect = _decode_returning(
+            VFSResourceType.CONSOLE_MESSAGES
+        )
+
+        with pytest.raises(ValueError, match="not supported"):
+            DocumentFactory.open_document(VFS_URI)
+
+    def test_missing_table_raises_value_error(
+        self, mock_vfs, _icb, mock_load, _schema_cls, _doc
+    ):
+        mock_vfs.VFS_FILE_URI_SCHEME = "vfs"
+        mock_vfs.decode_uri.side_effect = 
_decode_returning(VFSResourceType.STATE)
+        mock_load.return_value = None
+
+        with pytest.raises(ValueError, match="No storage is found"):
+            DocumentFactory.open_document(VFS_URI)
diff --git a/amber/src/main/python/core/storage/vfs_uri_factory.py 
b/amber/src/main/python/core/storage/vfs_uri_factory.py
index de0c5db56e..0e23e60705 100644
--- a/amber/src/main/python/core/storage/vfs_uri_factory.py
+++ b/amber/src/main/python/core/storage/vfs_uri_factory.py
@@ -34,6 +34,7 @@ class VFSResourceType(str, Enum):
     RESULT = "result"
     RUNTIME_STATISTICS = "runtimeStatistics"
     CONSOLE_MESSAGES = "consoleMessages"
+    STATE = "state"
 
 
 class VFSURIFactory:
diff --git 
a/amber/src/main/python/pytexera/storage/test_large_binary_manager.py 
b/amber/src/main/python/pytexera/storage/test_large_binary_manager.py
index 64c7080e52..1942e91f8b 100644
--- a/amber/src/main/python/pytexera/storage/test_large_binary_manager.py
+++ b/amber/src/main/python/pytexera/storage/test_large_binary_manager.py
@@ -34,6 +34,7 @@ class TestLargeBinaryManager:
                 rest_catalog_uri="http://localhost:8181/catalog/";,
                 rest_catalog_warehouse_name="texera",
                 table_result_namespace="test",
+                table_state_namespace="test-state",
                 directory_path="/tmp/test",
                 commit_batch_size=1000,
                 s3_endpoint="http://localhost:9000";,
diff --git a/amber/src/main/python/texera_run_python_worker.py 
b/amber/src/main/python/texera_run_python_worker.py
index 8687298f81..9b21fa5334 100644
--- a/amber/src/main/python/texera_run_python_worker.py
+++ b/amber/src/main/python/texera_run_python_worker.py
@@ -52,6 +52,7 @@ if __name__ == "__main__":
         iceberg_rest_catalog_uri,
         iceberg_rest_catalog_warehouse_name,
         iceberg_table_namespace,
+        iceberg_table_state_namespace,
         iceberg_file_storage_directory_path,
         iceberg_table_commit_batch_size,
         s3_endpoint,
@@ -68,6 +69,7 @@ if __name__ == "__main__":
         iceberg_rest_catalog_uri,
         iceberg_rest_catalog_warehouse_name,
         iceberg_table_namespace,
+        iceberg_table_state_namespace,
         iceberg_file_storage_directory_path,
         iceberg_table_commit_batch_size,
         s3_endpoint,
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
index 4ab3d18056..80f22ace79 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
@@ -124,6 +124,9 @@ class OutputManager(
       : mutable.HashMap[PortIdentity, OutputPortResultWriterThread] =
     mutable.HashMap()
 
+  private val stateWriters: mutable.HashMap[PortIdentity, 
BufferedItemWriter[Tuple]] =
+    mutable.HashMap()
+
   /**
     * Add down stream operator and its corresponding Partitioner.
     *
@@ -232,6 +235,10 @@ class OutputManager(
     })
   }
 
+  def saveStateToStorageIfNeeded(state: State): Unit = {
+    stateWriters.values.foreach(_.putOne(state.toTuple))
+  }
+
   /**
     * Singal the port storage writer to flush the remaining buffer and wait 
for commits to finish so that
     * the output port is properly completed. If the output port does not need 
storage, no action will be done.
@@ -245,7 +252,7 @@ class OutputManager(
         writerThread.join()
       case None =>
     }
-
+    this.stateWriters.remove(outputPortId).foreach(_.close())
   }
 
   def getPort(portId: PortIdentity): WorkerPort = ports(portId)
@@ -288,6 +295,16 @@ class OutputManager(
     val writerThread = new OutputPortResultWriterThread(bufferedItemWriter)
     this.outputPortResultWriterThreads(portId) = writerThread
     writerThread.start()
+
+    // The state document is provisioned alongside the result document
+    // by RegionExecutionCoordinator, so it is always present.
+    val stateWriter = DocumentFactory
+      .openDocument(State.uriFromResultUri(storageUri))
+      ._1
+      .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
+      .asInstanceOf[BufferedItemWriter[Tuple]]
+    stateWriter.open()
+    this.stateWriters(portId) = stateWriter
   }
 
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
index 4ff5ff15ae..3358e31e65 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
@@ -187,6 +187,7 @@ class PythonWorkflowWorker(
         if (isRest) StorageConfig.icebergRESTCatalogUri else "",
         if (isRest) StorageConfig.icebergRESTCatalogWarehouseName else "",
         StorageConfig.icebergTableResultNamespace,
+        StorageConfig.icebergTableStateNamespace,
         StorageConfig.fileStorageDirectoryPath.toString,
         StorageConfig.icebergTableCommitBatchSize.toString,
         StorageConfig.s3Endpoint,
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 254c16bf34..58fdf9f242 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -21,6 +21,7 @@ package org.apache.texera.amber.engine.architecture.scheduling
 
 import org.apache.pekko.pattern.gracefulStop
 import com.twitter.util.{Duration => TwitterDuration, Future, JavaTimer, 
Return, Throw, Timer}
+import org.apache.texera.amber.core.state.State
 import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI
 import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
@@ -569,12 +570,14 @@ class RegionExecutionCoordinator(
     portConfigs.foreach {
       case (outputPortId, portConfig) =>
         val storageUriToAdd = portConfig.storageURI
+        val stateUriToAdd = State.uriFromResultUri(storageUriToAdd)
         val (_, eid, _, _) = decodeURI(storageUriToAdd)
         val schemaOptional =
           
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
         val schema =
           schemaOptional.getOrElse(throw new IllegalStateException("Schema is 
missing"))
         DocumentFactory.createDocument(storageUriToAdd, schema)
+        DocumentFactory.createDocument(stateUriToAdd, State.schema)
         if (!isRestart) {
           WorkflowExecutionsResource.insertOperatorPortResultUri(
             eid = eid,
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
index 84f1e8ec65..b6c0c39aaf 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
@@ -126,6 +126,7 @@ class DataProcessor(
       val outputState = executor.processState(state, port)
       if (outputState.isDefined) {
         outputManager.emitState(outputState.get)
+        outputManager.saveStateToStorageIfNeeded(outputState.get)
       }
     } catch safely {
       case e =>
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
index 10fbbc44a2..90de86e1fd 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
@@ -21,6 +21,7 @@ package 
org.apache.texera.amber.engine.architecture.worker.managers
 
 import io.grpc.MethodDescriptor
 import org.apache.texera.amber.config.ApplicationConfig
+import org.apache.texera.amber.core.state.State
 import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.storage.model.VirtualDocument
 import org.apache.texera.amber.core.tuple.Tuple
@@ -45,7 +46,11 @@ import 
org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.{
   DPInputQueueElement,
   FIFOMessageElement
 }
-import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, 
WorkflowFIFOMessage}
+import org.apache.texera.amber.engine.common.ambermessage.{
+  DataFrame,
+  StateFrame,
+  WorkflowFIFOMessage
+}
 import 
org.apache.texera.amber.util.VirtualIdentityUtils.getFromActorIdForInputPortStorage
 
 import java.net.URI
@@ -106,6 +111,25 @@ class InputPortMaterializationReaderThread(
       }
       // Flush any remaining tuples in the buffer.
       if (buffer.nonEmpty) flush()
+
+      try {
+        val state_document =
+          DocumentFactory
+            .openDocument(State.uriFromResultUri(uri))
+            ._1
+            .asInstanceOf[VirtualDocument[Tuple]]
+        val stateReadIterator = state_document.get()
+
+        while (stateReadIterator.hasNext) {
+          val state = State.fromTuple(stateReadIterator.next())
+          inputMessageQueue.put(
+            FIFOMessageElement(WorkflowFIFOMessage(channelId, 
getSequenceNumber, StateFrame(state)))
+          )
+        }
+      } catch {
+        case _: Exception =>
+      }
+
       emitECM(METHOD_END_CHANNEL, PORT_ALIGNMENT)
       isFinished.set(true)
     } catch {
diff --git a/common/config/src/main/resources/storage.conf 
b/common/config/src/main/resources/storage.conf
index 1f39359155..da2f7ccc19 100644
--- a/common/config/src/main/resources/storage.conf
+++ b/common/config/src/main/resources/storage.conf
@@ -61,6 +61,9 @@ storage {
             runtime-statistics-namespace = "workflow-runtime-statistics"
             runtime-statistics-namespace = 
${?STORAGE_ICEBERG_TABLE_RUNTIME_STATISTICS_NAMESPACE}
 
+            state-namespace = "operator-port-state"
+            state-namespace = ${?STORAGE_ICEBERG_TABLE_STATE_NAMESPACE}
+
             commit {
                 batch-size = 4096 # decide the buffer size of our 
IcebergTableWriter
                 batch-size = ${?STORAGE_ICEBERG_TABLE_COMMIT_BATCH_SIZE}
diff --git 
a/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala
 
b/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala
index 9ec52bba65..123c56505e 100644
--- 
a/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala
+++ 
b/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala
@@ -67,6 +67,7 @@ object EnvironmentalVariable {
     "STORAGE_ICEBERG_TABLE_CONSOLE_MESSAGES_NAMESPACE"
   val ENV_ICEBERG_TABLE_RUNTIME_STATISTICS_NAMESPACE =
     "STORAGE_ICEBERG_TABLE_RUNTIME_STATISTICS_NAMESPACE"
+  val ENV_ICEBERG_TABLE_STATE_NAMESPACE = 
"STORAGE_ICEBERG_TABLE_STATE_NAMESPACE"
   val ENV_ICEBERG_TABLE_COMMIT_BATCH_SIZE = 
"STORAGE_ICEBERG_TABLE_COMMIT_BATCH_SIZE"
   val ENV_ICEBERG_TABLE_COMMIT_NUM_RETRIES = 
"STORAGE_ICEBERG_TABLE_COMMIT_NUM_RETRIES"
   val ENV_ICEBERG_TABLE_COMMIT_MIN_WAIT_MS = 
"STORAGE_ICEBERG_TABLE_COMMIT_MIN_WAIT_MS"
diff --git 
a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
 
b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
index 728e3c0c2d..07447cfdbe 100644
--- 
a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
+++ 
b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
@@ -54,6 +54,8 @@ object StorageConfig {
     conf.getString("storage.iceberg.table.console-messages-namespace")
   val icebergTableRuntimeStatisticsNamespace: String =
     conf.getString("storage.iceberg.table.runtime-statistics-namespace")
+  val icebergTableStateNamespace: String =
+    conf.getString("storage.iceberg.table.state-namespace")
   val icebergTableCommitBatchSize: Int =
     conf.getInt("storage.iceberg.table.commit.batch-size")
   val icebergTableCommitNumRetries: Int =
@@ -111,6 +113,7 @@ object StorageConfig {
     "STORAGE_ICEBERG_TABLE_CONSOLE_MESSAGES_NAMESPACE"
   val ENV_ICEBERG_TABLE_RUNTIME_STATISTICS_NAMESPACE =
     "STORAGE_ICEBERG_TABLE_RUNTIME_STATISTICS_NAMESPACE"
+  val ENV_ICEBERG_TABLE_STATE_NAMESPACE = 
"STORAGE_ICEBERG_TABLE_STATE_NAMESPACE"
   val ENV_ICEBERG_TABLE_COMMIT_BATCH_SIZE = 
"STORAGE_ICEBERG_TABLE_COMMIT_BATCH_SIZE"
   val ENV_ICEBERG_TABLE_COMMIT_NUM_RETRIES = 
"STORAGE_ICEBERG_TABLE_COMMIT_NUM_RETRIES"
   val ENV_ICEBERG_TABLE_COMMIT_MIN_WAIT_MS = 
"STORAGE_ICEBERG_TABLE_COMMIT_MIN_WAIT_MS"
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
index ba146f1d57..532f355c17 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonNode
 import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, 
Tuple}
 import org.apache.texera.amber.util.JSONUtils.objectMapper
 
+import java.net.URI
 import java.util.Base64
 import scala.jdk.CollectionConverters.IteratorHasAsScala
 
@@ -57,6 +58,9 @@ object State {
 
   def fromTuple(row: Tuple): State = fromJson(row.getField[String](Content))
 
+  def uriFromResultUri(resultUri: URI): URI =
+    new URI(resultUri.toString.replace("/result", "/state"))
+
   private def toJsonValue(value: Any): Any =
     value match {
       case null => null
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
index 15949ef471..00f6c70ba7 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
@@ -72,6 +72,7 @@ object DocumentFactory {
           case RESULT             => StorageConfig.icebergTableResultNamespace
           case CONSOLE_MESSAGES   => 
StorageConfig.icebergTableConsoleMessagesNamespace
           case RUNTIME_STATISTICS => 
StorageConfig.icebergTableRuntimeStatisticsNamespace
+          case STATE              => StorageConfig.icebergTableStateNamespace
           case _ =>
             throw new IllegalArgumentException(s"Resource type $resourceType 
is not supported")
         }
@@ -119,6 +120,7 @@ object DocumentFactory {
           case RESULT             => StorageConfig.icebergTableResultNamespace
           case CONSOLE_MESSAGES   => 
StorageConfig.icebergTableConsoleMessagesNamespace
           case RUNTIME_STATISTICS => 
StorageConfig.icebergTableRuntimeStatisticsNamespace
+          case STATE              => StorageConfig.icebergTableStateNamespace
           case _ =>
             throw new IllegalArgumentException(s"Resource type $resourceType 
is not supported")
         }
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
index 0fbee64457..e687b28a29 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
@@ -34,6 +34,7 @@ object VFSResourceType extends Enumeration {
   val RESULT: Value = Value("result")
   val RUNTIME_STATISTICS: Value = Value("runtimeStatistics")
   val CONSOLE_MESSAGES: Value = Value("consoleMessages")
+  val STATE: Value = Value("state")
 }
 
 object VFSURIFactory {
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
index 8fdf039f3e..7f1d8573c2 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
@@ -20,6 +20,7 @@
 package org.apache.texera.amber.storage.result.iceberg
 
 import org.apache.texera.amber.config.StorageConfig
+import org.apache.texera.amber.core.state.State
 import org.apache.texera.amber.core.storage.model.{VirtualDocument, 
VirtualDocumentSpec}
 import org.apache.texera.amber.core.storage.{DocumentFactory, 
IcebergCatalogInstance, VFSURIFactory}
 import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, 
Tuple}
@@ -141,6 +142,84 @@ class IcebergDocumentSpec extends 
VirtualDocumentSpec[Tuple] with BeforeAndAfter
     }
   }
 
+  it should "round trip materialized state documents" in {
+    val stateUri = State.uriFromResultUri(uri)
+    DocumentFactory.createDocument(stateUri, State.schema)
+    val stateDocument =
+      
DocumentFactory.openDocument(stateUri)._1.asInstanceOf[VirtualDocument[Tuple]]
+    val state = State(
+      Map(
+        "loop_counter" -> 3,
+        "name" -> "outer-loop",
+        "payload" -> Array[Byte](0, 1, 2, 3),
+        "nested" -> Map("enabled" -> true, "values" -> List(1, 2, 3))
+      )
+    )
+
+    val writer = stateDocument.writer(UUID.randomUUID().toString)
+    writer.open()
+    writer.putOne(state.toTuple)
+    writer.close()
+
+    val storedRows = stateDocument.get().toList
+    assert(storedRows.length == 1)
+    val deserialized = State.fromTuple(storedRows.head).values
+    assert(deserialized("loop_counter") == 3L)
+    assert(deserialized("name") == "outer-loop")
+    
assert(deserialized("payload").asInstanceOf[Array[Byte]].sameElements(Array[Byte](0,
 1, 2, 3)))
+    assert(deserialized("nested").asInstanceOf[Map[String, Any]]("enabled") == 
true)
+    assert(deserialized("nested").asInstanceOf[Map[String, Any]]("values") == 
List(1L, 2L, 3L))
+  }
+
+  it should "materialize multiple states as rows in one state table" in {
+    val stateUri = State.uriFromResultUri(uri)
+    DocumentFactory.createDocument(stateUri, State.schema)
+    val stateDocument =
+      
DocumentFactory.openDocument(stateUri)._1.asInstanceOf[VirtualDocument[Tuple]]
+    val states: List[State] = List(
+      State(Map("loop_counter" -> 0, "i" -> 1, "payload" -> Array[Byte](1, 2, 
3))),
+      State(
+        Map(
+          "loop_counter" -> 1,
+          "i" -> 2,
+          "payload" -> Array[Byte](4, 5, 6),
+          "nested" -> Map("values" -> List(3, 4))
+        )
+      )
+    )
+
+    val writer = stateDocument.writer(UUID.randomUUID().toString)
+    writer.open()
+    states.foreach(state => writer.putOne(state.toTuple))
+    writer.close()
+
+    val deserializedStates =
+      stateDocument
+        .get()
+        .toList
+        .map(State.fromTuple)
+        .sortBy(_.values("loop_counter").asInstanceOf[Long])
+    assert(deserializedStates.length == states.length)
+    deserializedStates.zip(states).foreach {
+      case (actual, expected) =>
+        assert(
+          actual.values("loop_counter") == 
expected.values("loop_counter").asInstanceOf[Int].toLong
+        )
+        assert(actual.values("i") == 
expected.values("i").asInstanceOf[Int].toLong)
+        assert(
+          actual
+            .values("payload")
+            .asInstanceOf[Array[Byte]]
+            .sameElements(expected.values("payload").asInstanceOf[Array[Byte]])
+        )
+    }
+    assert(
+      deserializedStates(1)
+        .values("nested")
+        .asInstanceOf[Map[String, Any]]("values") == List(3L, 4L)
+    )
+  }
+
   /** Returns a dynamic proxy for `realTable` that increments `counter` on 
every `refresh()` call. */
   private def tableWithRefreshSpy(realTable: Table, counter: AtomicInteger): 
Table =
     Proxy

Reply via email to