This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-state-only
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 84bd376983dc933a567cf9a75576358a5bd52e1f
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Apr 23 00:48:33 2026 -0700

    refactor: keep only state changes on top of main
---
 .../core/architecture/packaging/output_manager.py  |  38 +++++++-
 amber/src/main/python/core/models/operator.py      |   6 +-
 amber/src/main/python/core/models/state.py         |  97 ++++++++++---------
 .../main/python/core/runnables/network_receiver.py |  15 ++-
 .../main/python/core/runnables/network_sender.py   |  23 +++--
 .../main/python/core/runnables/test_main_loop.py   |  59 ++++++++++++
 .../main/python/core/storage/document_factory.py   | 107 ++++++++++++---------
 .../input_port_materialization_reader_runnable.py  |  29 +++++-
 .../main/python/core/storage/vfs_uri_factory.py    |   1 +
 .../messaginglayer/OutputManager.scala             |  20 ++++
 .../pythonworker/PythonProxyClient.scala           |   3 +-
 .../pythonworker/PythonProxyServer.scala           |   2 +-
 .../scheduling/RegionExecutionCoordinator.scala    |  58 +++++++++--
 .../engine/architecture/worker/DataProcessor.scala |   1 +
 .../InputPortMaterializationReaderThread.scala     |  26 ++++-
 .../amber/core/executor/OperatorExecutor.scala     |   8 +-
 .../org/apache/texera/amber/core/state/State.scala |  83 +++++++++++-----
 .../apache/texera/amber/core/state/package.scala   |  24 +++++
 .../amber/core/storage/DocumentFactory.scala       |   2 +
 .../texera/amber/core/storage/VFSURIFactory.scala  |   1 +
 .../amber/operator/ifStatement/IfOpExec.scala      |   2 +-
 21 files changed, 444 insertions(+), 161 deletions(-)

diff --git 
a/amber/src/main/python/core/architecture/packaging/output_manager.py 
b/amber/src/main/python/core/architecture/packaging/output_manager.py
index bf4afbf396..065b063f7d 100644
--- a/amber/src/main/python/core/architecture/packaging/output_manager.py
+++ b/amber/src/main/python/core/architecture/packaging/output_manager.py
@@ -17,6 +17,7 @@
 
 import threading
 import typing
+import uuid
 from collections import OrderedDict
 from itertools import chain
 from loguru import logger
@@ -43,7 +44,12 @@ from core.architecture.sendsemantics.round_robin_partitioner 
import (
 )
 from core.models import Tuple, Schema, StateFrame
 from core.models.payload import DataPayload, DataFrame
-from core.models.state import State
+from core.models.state import (
+    State,
+    STATE_SCHEMA,
+    serialize_state,
+    state_uri_from_result_uri,
+)
 from core.storage.document_factory import DocumentFactory
 from core.storage.runnables.port_storage_writer import (
     PortStorageWriter,
@@ -87,6 +93,8 @@ class OutputManager:
             PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread]
         ] = dict()
 
+        self._storage_uris: typing.Dict[PortIdentity, str] = dict()
+
     def is_missing_output_ports(self):
         """
         This method is only used for ensuring correct region execution.
@@ -126,6 +134,7 @@ class OutputManager:
         Create a separate thread for saving output tuples of a port
         to storage in batch.
         """
+        self._storage_uris[port_id] = storage_uri
         document, _ = DocumentFactory.open_document(storage_uri)
         buffered_item_writer = 
document.writer(str(get_worker_index(self.worker_id)))
         writer_queue = Queue()
@@ -171,6 +180,31 @@ class OutputManager:
                 PortStorageWriterElement(data_tuple=tuple_)
             )
 
+    def save_state_to_storage_if_needed(self, state: State, port_id=None) -> 
None:
+        if port_id is None:
+            uris = self._storage_uris.values()
+        elif port_id in self._storage_uris:
+            uris = [self._storage_uris[port_id]]
+        else:
+            return
+
+        for uri in uris:
+            state_uri = state_uri_from_result_uri(uri)
+            try:
+                document = DocumentFactory.open_document(state_uri)[0]
+            except ValueError:
+                document = DocumentFactory.create_document(state_uri, 
STATE_SCHEMA)
+            writer = document.writer(str(uuid.uuid4()))
+            writer.put_one(serialize_state(state))
+            writer.close()
+
+    def reset_output_storage(self) -> None:
+        port_id = self.get_port_ids()[0]
+        storage_uri = self._storage_uris[port_id]
+        self.close_port_storage_writers()
+        DocumentFactory.create_document(storage_uri, 
self._ports[port_id].get_schema())
+        self.set_up_port_storage_writer(port_id, storage_uri)
+
     def close_port_storage_writers(self) -> None:
         """
         Flush the buffers of port storage writers and wait for all the
@@ -248,7 +282,7 @@ class OutputManager:
                         receiver,
                         (
                             StateFrame(payload)
-                            if isinstance(payload, State)
+                            if isinstance(payload, dict)
                             else self.tuple_to_frame(payload)
                         ),
                     )
diff --git a/amber/src/main/python/core/models/operator.py 
b/amber/src/main/python/core/models/operator.py
index 7905083995..91c5b2cf27 100644
--- a/amber/src/main/python/core/models/operator.py
+++ b/amber/src/main/python/core/models/operator.py
@@ -108,14 +108,12 @@ class Operator(ABC):
     def process_state(self, state: State, port: int) -> Optional[State]:
         """
         Process an input State from the given link.
-        The default implementation is to pass the State to all downstream 
operators
-        if the State has pass_to_all_downstream set to True.
+        The default implementation is to pass the State to downstream 
operators.
         :param state: State, a State from an input port to be processed.
         :param port: int, input port index of the current exhausted port.
         :return: State, producing one State object
         """
-        if state.passToAllDownstream:
-            return state
+        return state
 
     def produce_state_on_start(self, port: int) -> State:
         """
diff --git a/amber/src/main/python/core/models/state.py 
b/amber/src/main/python/core/models/state.py
index 2c8a268dfb..e5726cc3c2 100644
--- a/amber/src/main/python/core/models/state.py
+++ b/amber/src/main/python/core/models/state.py
@@ -15,61 +15,64 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from dataclasses import dataclass
-from pandas import DataFrame
-from pyarrow import Table
-from typing import Optional
+import base64
+import json
+from typing import Any, Dict, TypeAlias
 
-from .schema import Schema, AttributeType
-from .schema.attribute_type import FROM_PYOBJECT_MAPPING
+from .schema import Schema
+from .tuple import Tuple
 
+State: TypeAlias = Dict[str, Any]
 
-@dataclass
-class State:
-    def __init__(
-        self, table: Optional[Table] = None, pass_to_all_downstream: bool = 
False
-    ):
-        self.schema = Schema()
-        self.passToAllDownstream = pass_to_all_downstream
-        if table is not None:
-            self.__dict__.update(table.to_pandas().iloc[0].to_dict())
-            self.schema = Schema(table.schema)
+STATE_CONTENT = "content"
+_TYPE_MARKER = "__texera_type__"
+_PAYLOAD_MARKER = "payload"
+_BYTES_TYPE = "bytes"
 
-    def add(
-        self, key: str, value: any, value_type: Optional[AttributeType] = None
-    ) -> None:
-        self.__dict__[key] = value
-        if value_type is not None:
-            self.schema.add(key, value_type)
-        elif key != "schema":
-            self.schema.add(key, FROM_PYOBJECT_MAPPING[type(value)])
+STATE_SCHEMA = Schema(raw_schema={STATE_CONTENT: "STRING"})
 
-    def get(self, key: str) -> any:
-        return self.__dict__[key]
 
-    def to_table(self) -> Table:
-        return Table.from_pandas(
-            df=DataFrame([self.__dict__]),
-            schema=self.schema.as_arrow_schema(),
-        )
+def state_uri_from_result_uri(result_uri: str) -> str:
+    return result_uri.replace("/result", "/state")
 
-    def __setattr__(self, key: str, value: any) -> None:
-        self.add(key, value)
 
-    def __setitem__(self, key: str, value: any) -> None:
-        self.add(key, value)
+def serialize_state(state: State) -> Tuple:
+    return Tuple(
+        {
+            STATE_CONTENT: json.dumps(
+                _to_json_value(state), separators=(",", ":")
+            )
+        },
+        schema=STATE_SCHEMA,
+    )
 
-    def __getitem__(self, key: str) -> any:
-        return self.get(key)
 
-    def __str__(self) -> str:
-        content = ", ".join(
-            [
-                repr(key) + ": " + repr(value)
-                for key, value in self.__dict__.items()
-                if key != "schema"
-            ]
-        )
-        return f"State[{content}]"
+def deserialize_state(row: Tuple) -> State:
+    return _from_json_value(json.loads(row[STATE_CONTENT]))
 
-    __repr__ = __str__
+
+def _to_json_value(value: Any) -> Any:
+    if value is None or isinstance(value, (bool, int, float, str)):
+        return value
+    if isinstance(value, bytes):
+        return {
+            _TYPE_MARKER: _BYTES_TYPE,
+            _PAYLOAD_MARKER: base64.b64encode(value).decode("ascii"),
+        }
+    if isinstance(value, dict):
+        return {str(key): _to_json_value(inner) for key, inner in 
value.items()}
+    if isinstance(value, (list, tuple)):
+        return [_to_json_value(inner) for inner in value]
+    raise TypeError(
+        f"State value of type {type(value).__name__} is not JSON serializable"
+    )
+
+
+def _from_json_value(value: Any) -> Any:
+    if isinstance(value, list):
+        return [_from_json_value(inner) for inner in value]
+    if isinstance(value, dict):
+        if value.get(_TYPE_MARKER) == _BYTES_TYPE:
+            return base64.b64decode(value[_PAYLOAD_MARKER])
+        return {key: _from_json_value(inner) for key, inner in value.items()}
+    return value
diff --git a/amber/src/main/python/core/runnables/network_receiver.py 
b/amber/src/main/python/core/runnables/network_receiver.py
index fd42a8f589..e1815b08f7 100644
--- a/amber/src/main/python/core/runnables/network_receiver.py
+++ b/amber/src/main/python/core/runnables/network_receiver.py
@@ -32,6 +32,7 @@ from 
core.architecture.handlers.actorcommand.credit_update_handler import (
 )
 from core.models import (
     DataFrame,
+    Tuple,
     StateFrame,
 )
 from core.models.internal_queue import (
@@ -40,8 +41,8 @@ from core.models.internal_queue import (
     InternalQueue,
     ECMElement,
 )
-from core.models.state import State
 from core.proxy import ProxyServer
+from core.models.state import STATE_SCHEMA, deserialize_state
 from core.util import Stoppable, get_one_of
 from core.util.runnable.runnable import Runnable
 from proto.org.apache.texera.amber.engine.architecture.rpc import 
EmbeddedControlMessage
@@ -96,7 +97,17 @@ class NetworkReceiver(Runnable, Stoppable):
                 "Data",
                 lambda _: DataFrame(table),
                 "State",
-                lambda _: StateFrame(State(table)),
+                lambda _: StateFrame(
+                    deserialize_state(
+                        Tuple(
+                            {
+                                name: table[name][0].as_py()
+                                for name in STATE_SCHEMA.get_attr_names()
+                            },
+                            schema=STATE_SCHEMA,
+                        )
+                    )
+                ),
                 "ECM",
                 lambda _: 
EmbeddedControlMessage().parse(table["payload"][0].as_py()),
             )
diff --git a/amber/src/main/python/core/runnables/network_sender.py 
b/amber/src/main/python/core/runnables/network_sender.py
index 9595433fb7..f1bd8659ee 100644
--- a/amber/src/main/python/core/runnables/network_sender.py
+++ b/amber/src/main/python/core/runnables/network_sender.py
@@ -20,13 +20,18 @@ from loguru import logger
 from overrides import overrides
 from typing import Optional
 
-from core.models import DataPayload, InternalQueue, DataFrame, StateFrame, 
State
+from core.models import DataPayload, InternalQueue, DataFrame, StateFrame
 from core.models.internal_queue import (
     InternalQueueElement,
     DataElement,
     DCMElement,
     ECMElement,
 )
+from core.models.state import (
+    STATE_CONTENT,
+    STATE_SCHEMA,
+    serialize_state,
+)
 from core.proxy import ProxyClient
 from core.util import StoppableQueueBlockingRunnable
 from proto.org.apache.texera.amber.core import ChannelIdentity
@@ -98,13 +103,15 @@ class NetworkSender(StoppableQueueBlockingRunnable):
             data_header = PythonDataHeader(tag=to, payload_type="Data")
             self._proxy_client.send_data(bytes(data_header), 
data_payload.frame)
         elif isinstance(data_payload, StateFrame):
-            data_header = PythonDataHeader(
-                tag=to, payload_type=data_payload.frame.__class__.__name__
-            )
-            table = (
-                data_payload.frame.to_table()
-                if isinstance(data_payload.frame, State)
-                else None
+            data_header = PythonDataHeader(tag=to, payload_type="State")
+            serialized_state = serialize_state(data_payload.frame)
+            table = pa.Table.from_pydict(
+                {
+                    STATE_CONTENT: [
+                        serialized_state[STATE_CONTENT]
+                    ],
+                },
+                schema=STATE_SCHEMA.as_arrow_schema(),
             )
             self._proxy_client.send_data(bytes(data_header), table)
         else:
diff --git a/amber/src/main/python/core/runnables/test_main_loop.py 
b/amber/src/main/python/core/runnables/test_main_loop.py
index 5ad0afec9b..3ff0367208 100644
--- a/amber/src/main/python/core/runnables/test_main_loop.py
+++ b/amber/src/main/python/core/runnables/test_main_loop.py
@@ -25,6 +25,8 @@ from threading import Thread
 from core.models import (
     DataFrame,
     InternalQueue,
+    State,
+    StateFrame,
     Tuple,
 )
 from core.models.internal_queue import (
@@ -1077,6 +1079,63 @@ class TestMainLoop:
             ),
         )
 
+    @pytest.mark.timeout(2)
+    def test_process_state_can_emit_multiple_states(
+        self,
+        main_loop,
+        output_queue,
+        mock_data_output_channel,
+        monkeypatch,
+    ):
+        class DummyExecutor:
+            @staticmethod
+            def process_state(state: State, port: int) -> State:
+                return {"value": state["value"] + 1, "port": port}
+
+        main_loop.context.executor_manager.executor = DummyExecutor()
+        monkeypatch.setattr(main_loop, "_check_and_process_control", lambda: 
None)
+        monkeypatch.setattr(
+            main_loop.context.output_manager,
+            "emit_state",
+            lambda state: [(mock_data_output_channel.to_worker_id, 
StateFrame(state))],
+        )
+
+        switch_count = {"value": 0}
+
+        def fake_switch_context():
+            switch_count["value"] += 1
+            # xinyuan-state-only still uses the original two-switch state 
handshake:
+            # the DataProcessor produces output during the first switch of each
+            # process_input_state() call, before MainLoop reads 
current_output_state.
+            if switch_count["value"] % 2 == 1:
+                current_input_state = (
+                    
main_loop.context.state_processing_manager.current_input_state
+                )
+                
main_loop.context.state_processing_manager.current_output_state = (
+                    DummyExecutor.process_state(current_input_state, 0)
+                )
+
+        monkeypatch.setattr(main_loop, "_switch_context", fake_switch_context)
+
+        first_state = {"value": 1}
+        second_state = {"value": 41}
+
+        main_loop._process_state(first_state)
+        main_loop._process_state(second_state)
+
+        first_output: DataElement = output_queue.get()
+        second_output: DataElement = output_queue.get()
+
+        assert first_output.tag == mock_data_output_channel
+        assert isinstance(first_output.payload, StateFrame)
+        assert first_output.payload.frame["value"] == 2
+        assert first_output.payload.frame["port"] == 0
+
+        assert second_output.tag == mock_data_output_channel
+        assert isinstance(second_output.payload, StateFrame)
+        assert second_output.payload.frame["value"] == 42
+        assert second_output.payload.frame["port"] == 0
+
     @pytest.mark.timeout(5)
     def test_main_loop_thread_can_align_ecm(
         self,
diff --git a/amber/src/main/python/core/storage/document_factory.py 
b/amber/src/main/python/core/storage/document_factory.py
index 9b686ab66b..8a4d6fe3c5 100644
--- a/amber/src/main/python/core/storage/document_factory.py
+++ b/amber/src/main/python/core/storage/document_factory.py
@@ -61,30 +61,35 @@ class DocumentFactory:
         if parsed_uri.scheme == VFSURIFactory.VFS_FILE_URI_SCHEME:
             _, _, _, resource_type = VFSURIFactory.decode_uri(uri)
 
-            if resource_type in {VFSResourceType.RESULT}:
-                storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
-
-                # Convert Amber Schema to Iceberg Schema with LARGE_BINARY
-                # field name encoding
-                iceberg_schema = amber_schema_to_iceberg_schema(schema)
-
-                create_table(
-                    IcebergCatalogInstance.get_instance(),
-                    StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
-                    storage_key,
-                    iceberg_schema,
-                    override_if_exists=True,
-                )
-
-                return IcebergDocument[Tuple](
-                    StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
-                    storage_key,
-                    iceberg_schema,
-                    amber_tuples_to_arrow_table,
-                    arrow_table_to_amber_tuples,
-                )
-            else:
-                raise ValueError(f"Resource type {resource_type} is not 
supported")
+            match resource_type:
+                case VFSResourceType.RESULT:
+                    namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
+                case VFSResourceType.STATE:
+                    namespace = "state"
+                case _:
+                    raise ValueError(f"Resource type {resource_type} is not 
supported")
+
+            storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
+            # Convert Amber Schema to Iceberg Schema with LARGE_BINARY
+            # field name encoding
+            iceberg_schema = amber_schema_to_iceberg_schema(schema)
+
+            create_table(
+                IcebergCatalogInstance.get_instance(),
+                namespace,
+                storage_key,
+                iceberg_schema,
+                override_if_exists=True,
+            )
+
+            return IcebergDocument[Tuple](
+                namespace,
+                storage_key,
+                iceberg_schema,
+                amber_tuples_to_arrow_table,
+                arrow_table_to_amber_tuples,
+            )
+
         else:
             raise NotImplementedError(
                 f"Unsupported URI scheme: {parsed_uri.scheme} for creating the 
document"
@@ -96,30 +101,36 @@ class DocumentFactory:
         if parsed_uri.scheme == "vfs":
             _, _, _, resource_type = VFSURIFactory.decode_uri(uri)
 
-            if resource_type in {VFSResourceType.RESULT}:
-                storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
-
-                table = load_table_metadata(
-                    IcebergCatalogInstance.get_instance(),
-                    StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
-                    storage_key,
-                )
-
-                if table is None:
-                    raise ValueError("No storage is found for the given URI")
-
-                amber_schema = Schema(table.schema().as_arrow())
-
-                document = IcebergDocument(
-                    StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
-                    storage_key,
-                    table.schema(),
-                    amber_tuples_to_arrow_table,
-                    arrow_table_to_amber_tuples,
-                )
-                return document, amber_schema
-            else:
-                raise ValueError(f"Resource type {resource_type} is not 
supported")
+            match resource_type:
+                case VFSResourceType.RESULT:
+                    namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
+                case VFSResourceType.STATE:
+                    namespace = "state"
+                case _:
+                    raise ValueError(f"Resource type {resource_type} is not 
supported")
+
+            storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
+
+            table = load_table_metadata(
+                IcebergCatalogInstance.get_instance(),
+                namespace,
+                storage_key,
+            )
+
+            if table is None:
+                raise ValueError("No storage is found for the given URI")
+
+            amber_schema = Schema(table.schema().as_arrow())
+
+            document = IcebergDocument(
+                namespace,
+                storage_key,
+                table.schema(),
+                amber_tuples_to_arrow_table,
+                arrow_table_to_amber_tuples,
+            )
+            return document, amber_schema
+
         else:
             raise NotImplementedError(
                 f"Unsupported URI scheme: {parsed_uri.scheme} for opening the 
document"
diff --git 
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
 
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
index e49c0316cc..a600f87857 100644
--- 
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
+++ 
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
@@ -17,8 +17,8 @@
 
 import typing
 from loguru import logger
-from pyarrow import Table
 from typing import Union
+from pyarrow import Table
 
 from core.architecture.sendsemantics.broad_cast_partitioner import (
     BroadcastPartitioner,
@@ -34,8 +34,9 @@ from 
core.architecture.sendsemantics.range_based_shuffle_partitioner import (
 from core.architecture.sendsemantics.round_robin_partitioner import (
     RoundRobinPartitioner,
 )
-from core.models import Tuple, InternalQueue, DataFrame, DataPayload
+from core.models import Tuple, InternalQueue, DataFrame, DataPayload, State, 
StateFrame
 from core.models.internal_queue import DataElement, ECMElement
+from core.models.state import deserialize_state, state_uri_from_result_uri
 from core.storage.document_factory import DocumentFactory
 from core.util import Stoppable, get_one_of
 from core.util.runnable.runnable import Runnable
@@ -125,6 +126,15 @@ class InputPortMaterializationReaderRunnable(Runnable, 
Stoppable):
             if receiver == self.worker_actor_id:
                 yield self.tuples_to_data_frame(tuples)
 
+    def emit_state_with_filter(self, state: State) -> 
typing.Iterator[StateFrame]:
+        for receiver, payload in self.partitioner.flush_state(state):
+            if receiver == self.worker_actor_id:
+                yield (
+                    StateFrame(payload)
+                    if isinstance(payload, dict)
+                    else self.tuples_to_data_frame(payload)
+                )
+
     def run(self) -> None:
         """
         Main execution logic that reads tuples from the materialized storage 
and
@@ -138,8 +148,21 @@ class InputPortMaterializationReaderRunnable(Runnable, 
Stoppable):
                 self.uri
             )
             self.emit_ecm("StartChannel", 
EmbeddedControlMessageType.NO_ALIGNMENT)
-            storage_iterator = self.materialization.get()
 
+            try:
+                state_document, _ = DocumentFactory.open_document(
+                    state_uri_from_result_uri(self.uri)
+                )
+                state_iterator = state_document.get()
+                for state in state_iterator:
+                    for state_frame in self.emit_state_with_filter(
+                        deserialize_state(state)
+                    ):
+                        self.emit_payload(state_frame)
+            except ValueError:
+                pass
+
+            storage_iterator = self.materialization.get()
             # Iterate and process tuples.
             for tup in storage_iterator:
                 if self._stopped:
diff --git a/amber/src/main/python/core/storage/vfs_uri_factory.py 
b/amber/src/main/python/core/storage/vfs_uri_factory.py
index de0c5db56e..0e23e60705 100644
--- a/amber/src/main/python/core/storage/vfs_uri_factory.py
+++ b/amber/src/main/python/core/storage/vfs_uri_factory.py
@@ -34,6 +34,7 @@ class VFSResourceType(str, Enum):
     RESULT = "result"
     RUNTIME_STATISTICS = "runtimeStatistics"
     CONSOLE_MESSAGES = "consoleMessages"
+    STATE = "state"
 
 
 class VFSURIFactory:
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
index 4ab3d18056..53755b780c 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
@@ -124,6 +124,8 @@ class OutputManager(
       : mutable.HashMap[PortIdentity, OutputPortResultWriterThread] =
     mutable.HashMap()
 
+  private val storageUris: mutable.HashMap[Int, URI] = mutable.HashMap()
+
   /**
     * Add down stream operator and its corresponding Partitioner.
     *
@@ -232,6 +234,23 @@ class OutputManager(
     })
   }
 
+  def saveStateToStorageIfNeeded(state: State): Unit = {
+    try {
+      storageUris.foreach {
+        case (_, uri) =>
+          val writer = DocumentFactory
+            .openDocument(State.stateUriFromResultUri(uri))
+            ._1
+            .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
+            .asInstanceOf[BufferedItemWriter[Tuple]]
+          writer.putOne(State.serialize(state))
+          writer.close()
+      }
+    } catch {
+      case _: Exception => ()
+    }
+  }
+
   /**
     * Singal the port storage writer to flush the remaining buffer and wait 
for commits to finish so that
     * the output port is properly completed. If the output port does not need 
storage, no action will be done.
@@ -280,6 +299,7 @@ class OutputManager(
   }
 
   private def setupOutputStorageWriterThread(portId: PortIdentity, storageUri: 
URI): Unit = {
+    this.storageUris(portId.id) = storageUri
     val bufferedItemWriter = DocumentFactory
       .openDocument(storageUri)
       ._1
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala
index 6618e857b1..cfdb6a82f8 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala
@@ -21,6 +21,7 @@ package 
org.apache.texera.amber.engine.architecture.pythonworker
 
 import com.twitter.util.{Await, Promise}
 import org.apache.texera.amber.core.WorkflowRuntimeException
+import org.apache.texera.amber.core.state.State
 import org.apache.texera.amber.core.tuple.{Schema, Tuple}
 import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, 
ChannelIdentity}
 import 
org.apache.texera.amber.engine.architecture.pythonworker.WorkerBatchInternalQueue.{
@@ -125,7 +126,7 @@ class PythonProxyClient(portNumberPromise: Promise[Int], 
val actorId: ActorVirtu
       case DataFrame(frame) =>
         writeArrowStream(mutable.Queue(ArraySeq.unsafeWrapArray(frame): _*), 
from, "Data")
       case StateFrame(state) =>
-        writeArrowStream(mutable.Queue(state.toTuple), from, "State")
+        writeArrowStream(mutable.Queue(State.serialize(state)), from, "State")
     }
   }
 
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyServer.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyServer.scala
index c904e436bc..463dc4b75a 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyServer.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyServer.scala
@@ -128,7 +128,7 @@ private class AmberProducer(
     dataHeader.payloadType match {
       case "State" =>
         assert(root.getRowCount == 1)
-        outputPort.sendTo(to, 
StateFrame(State(Some(ArrowUtils.getTexeraTuple(0, root)))))
+        outputPort.sendTo(to, 
StateFrame(State.deserialize(ArrowUtils.getTexeraTuple(0, root))))
       case "ECM" =>
         assert(root.getRowCount == 1)
         outputPort.sendTo(
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index e490cde3d9..5be5d942e5 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -20,7 +20,8 @@
 package org.apache.texera.amber.engine.architecture.scheduling
 
 import org.apache.pekko.pattern.gracefulStop
-import com.twitter.util.{Future, Return, Throw}
+import com.twitter.util.{Duration => TwitterDuration, Future, JavaTimer, 
Return, Throw, Timer}
+import org.apache.texera.amber.core.state.State
 import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI
 import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
@@ -61,7 +62,7 @@ import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutions
 
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicReference
-import scala.concurrent.duration.Duration
+import scala.concurrent.duration.{Duration => ScalaDuration}
 
 /**
   * The executor of a region.
@@ -109,10 +110,14 @@ class RegionExecutionCoordinator(
   private val currentPhaseRef: AtomicReference[RegionExecutionPhase] = new 
AtomicReference(
     Unexecuted
   )
+  private val terminationFutureRef: AtomicReference[Future[Unit]] = new 
AtomicReference(null)
+  private val killRetryTimer: Timer = new JavaTimer(true)
+  private val killRetryDelay: TwitterDuration = 
TwitterDuration.fromMilliseconds(200)
 
   /**
     * Sync the status of `RegionExecution` and transition this coordinator's 
phase to `Completed` only when the
-    * coordinator is currently in `ExecutingNonDependeePortsPhase` and all the 
ports of this region are completed.
+    * coordinator is currently in `ExecutingNonDependeePortsPhase`, all the 
ports of this region are completed, and
+    * all workers in this region are terminated.
     *
     * Additionally, this method will also terminate all the workers of this 
region:
     *
@@ -135,12 +140,22 @@ class RegionExecutionCoordinator(
       return Future.Unit
     }
 
-    // Set this coordinator's status to be completed so that subsequent 
regions can be started by
-    // WorkflowExecutionCoordinator.
-    setPhase(Completed)
-
-    // Terminate all the workers in this region.
-    terminateWorkers(regionExecution)
+    val existingTerminationFuture = terminationFutureRef.get
+    if (existingTerminationFuture != null) {
+      existingTerminationFuture
+    } else {
+      val terminationFuture = 
terminateWorkersWithRetry(regionExecution).flatMap { _ =>
+        // Set this coordinator's status to be completed so that subsequent 
regions can be started by
+        // WorkflowExecutionCoordinator.
+        setPhase(Completed)
+        Future.Unit
+      }
+      if (terminationFutureRef.compareAndSet(null, terminationFuture)) {
+        terminationFuture
+      } else {
+        terminationFutureRef.get
+      }
+    }
   }
 
   private def terminateWorkers(regionExecution: RegionExecution) = {
@@ -167,7 +182,7 @@ class RegionExecutionCoordinator(
                 val actorRef = actorRefService.getActorRef(workerId)
                 // Remove the actorRef so that no other actors can find the 
worker and send messages.
                 actorRefService.removeActorRef(workerId)
-                gracefulStop(actorRef, Duration(5, 
TimeUnit.SECONDS)).asTwitter()
+                gracefulStop(actorRef, ScalaDuration(5, 
TimeUnit.SECONDS)).asTwitter()
               }
           }.toSeq
 
@@ -191,8 +206,29 @@ class RegionExecutionCoordinator(
     }
   }
 
+  private def terminateWorkersWithRetry(
+      regionExecution: RegionExecution,
+      attempt: Int = 1
+  ): Future[Unit] = {
+    terminateWorkers(regionExecution).rescue { case err =>
+      logger.warn(
+        s"Failed to terminate region ${region.id.id} on attempt $attempt. 
Retrying in ${killRetryDelay.inMilliseconds} ms.",
+        err
+      )
+      Future
+        .sleep(killRetryDelay)(killRetryTimer)
+        .flatMap(_ => terminateWorkersWithRetry(regionExecution, attempt + 1))
+    }
+  }
+
   def isCompleted: Boolean = currentPhaseRef.get == Completed
 
+  /**
+    * Returns the region termination future if termination has been initiated.
+    * This is only set by `tryCompleteRegionExecution()`.
+    */
+  def getTerminationFutureOpt: Option[Future[Unit]] = 
Option(terminationFutureRef.get)
+
   /**
     * This will sync and transition the region execution phase from one to 
another depending on its current phase:
     *
@@ -528,12 +564,14 @@ class RegionExecutionCoordinator(
     portConfigs.foreach {
       case (outputPortId, portConfig) =>
         val storageUriToAdd = portConfig.storageURI
+        val stateUriToAdd = State.stateUriFromResultUri(storageUriToAdd)
         val (_, eid, _, _) = decodeURI(storageUriToAdd)
         val schemaOptional =
           
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
         val schema =
           schemaOptional.getOrElse(throw new IllegalStateException("Schema is 
missing"))
         DocumentFactory.createDocument(storageUriToAdd, schema)
+        DocumentFactory.createDocument(stateUriToAdd, State.schema)
         WorkflowExecutionsResource.insertOperatorPortResultUri(
           eid = eid,
           globalPortId = outputPortId,
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
index 3aa5fa90a4..65c560ee59 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
@@ -126,6 +126,7 @@ class DataProcessor(
       val outputState = executor.processState(state, port)
       if (outputState.isDefined) {
         outputManager.emitState(outputState.get)
+        outputManager.saveStateToStorageIfNeeded(state)
       }
     } catch safely {
       case e =>
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
index 10fbbc44a2..acada743bc 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
@@ -21,6 +21,7 @@ package 
org.apache.texera.amber.engine.architecture.worker.managers
 
 import io.grpc.MethodDescriptor
 import org.apache.texera.amber.config.ApplicationConfig
+import org.apache.texera.amber.core.state.State
 import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.storage.model.VirtualDocument
 import org.apache.texera.amber.core.tuple.Tuple
@@ -45,7 +46,11 @@ import 
org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.{
   DPInputQueueElement,
   FIFOMessageElement
 }
-import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, 
WorkflowFIFOMessage}
+import org.apache.texera.amber.engine.common.ambermessage.{
+  DataFrame,
+  StateFrame,
+  WorkflowFIFOMessage
+}
 import 
org.apache.texera.amber.util.VirtualIdentityUtils.getFromActorIdForInputPortStorage
 
 import java.net.URI
@@ -106,6 +111,25 @@ class InputPortMaterializationReaderThread(
       }
       // Flush any remaining tuples in the buffer.
       if (buffer.nonEmpty) flush()
+
+      try {
+        val state_document =
+          DocumentFactory
+            .openDocument(State.stateUriFromResultUri(uri))
+            ._1
+            .asInstanceOf[VirtualDocument[Tuple]]
+        val stateReadIterator = state_document.get()
+
+        while (stateReadIterator.hasNext) {
+          val state = State.deserialize(stateReadIterator.next())
+          inputMessageQueue.put(
+            FIFOMessageElement(WorkflowFIFOMessage(channelId, 
getSequenceNumber, StateFrame(state)))
+          )
+        }
+      } catch {
+        case _: Exception =>
+      }
+
       emitECM(METHOD_END_CHANNEL, PORT_ALIGNMENT)
       isFinished.set(true)
     } catch {
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/executor/OperatorExecutor.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/executor/OperatorExecutor.scala
index f99739acc0..9837213abb 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/executor/OperatorExecutor.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/executor/OperatorExecutor.scala
@@ -29,13 +29,7 @@ trait OperatorExecutor {
 
   def produceStateOnStart(port: Int): Option[State] = None
 
-  def processState(state: State, port: Int): Option[State] = {
-    if (state.isPassToAllDownstream) {
-      Some(state)
-    } else {
-      None
-    }
-  }
+  def processState(state: State, port: Int): Option[State] = Some(state)
 
   def processTupleMultiPort(
       tuple: Tuple,
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
index 3226c9d2fe..f76a314b7a 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
@@ -19,39 +19,70 @@
 
 package org.apache.texera.amber.core.state
 
+import com.fasterxml.jackson.databind.JsonNode
 import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, 
Tuple}
+import org.apache.texera.amber.util.JSONUtils.objectMapper
 
-import scala.collection.mutable
+import java.net.URI
+import java.util.Base64
+import scala.jdk.CollectionConverters.IteratorHasAsScala
 
-final case class State(tuple: Option[Tuple] = None, passToAllDownstream: 
Boolean = false) {
-  val data: mutable.Map[String, (AttributeType, Any)] = mutable.LinkedHashMap()
-  add("passToAllDownstream", passToAllDownstream, AttributeType.BOOLEAN)
-  if (tuple.isDefined) {
-    tuple.get.getSchema.getAttributes.foreach { attribute =>
-      add(attribute.getName, tuple.get.getField(attribute.getName), 
attribute.getType)
-    }
-  }
+object State {
+  private val StateContent = "content"
+  private val BytesTypeMarker = "__texera_type__"
+  private val BytesValue = "bytes"
+  private val PayloadMarker = "payload"
 
-  def add(key: String, value: Any, valueType: AttributeType): Unit =
-    data.put(key, (valueType, value))
+  val schema: Schema = new Schema(
+    new Attribute(StateContent, AttributeType.STRING)
+  )
 
-  def get(key: String): Any = data(key)._2
+  def stateUriFromResultUri(resultUri: URI): URI =
+    new URI(resultUri.toString.replace("/result", "/state"))
 
-  def isPassToAllDownstream: Boolean = 
get("passToAllDownstream").asInstanceOf[Boolean]
+  def serialize(state: State): Tuple = {
+    val payloadJson = objectMapper.writeValueAsString(toJsonValue(state))
+    Tuple.builder(schema).addSequentially(Array(payloadJson)).build()
+  }
 
-  def apply(key: String): Any = get(key)
+  def deserialize(tuple: Tuple): State = {
+    val payload = tuple.getField[String](StateContent)
+    objectMapper.readTree(payload).fields().asScala.map(entry => entry.getKey 
-> fromJsonValue(entry.getValue)).toMap
+  }
 
-  def toTuple: Tuple =
-    Tuple
-      .builder(
-        Schema(data.map {
-          case (name, (attrType, _)) =>
-            new Attribute(name, attrType)
-        }.toList)
-      )
-      .addSequentially(data.values.map(_._2).toArray)
-      .build()
+  private def toJsonValue(value: Any): Any =
+    value match {
+      case null => null
+      case bytes: Array[Byte] =>
+        Map(BytesTypeMarker -> BytesValue, PayloadMarker -> 
Base64.getEncoder.encodeToString(bytes))
+      case map: State =>
+        map.iterator.map { case (k, v) => k -> toJsonValue(v) }.toMap
+      case iterable: Iterable[_] =>
+        iterable.map(toJsonValue).toList
+      case other => other
+    }
 
-  override def toString: String =
-    data.map { case (key, (_, value)) => s"$key: $value" }.mkString(", ")
+  private def fromJsonValue(node: JsonNode): Any = {
+    if (node == null || node.isNull) {
+      null
+    } else if (node.isObject) {
+      val fields = node.fields().asScala.map(entry => entry.getKey -> 
entry.getValue).toMap
+      fields.get(BytesTypeMarker) match {
+        case Some(typeNode) if typeNode.isTextual && typeNode.asText() == 
BytesValue =>
+          Base64.getDecoder.decode(fields(PayloadMarker).asText())
+        case _ =>
+          fields.view.mapValues(fromJsonValue).toMap
+      }
+    } else if (node.isArray) {
+      node.elements().asScala.map(fromJsonValue).toList
+    } else if (node.isBoolean) {
+      node.asBoolean()
+    } else if (node.isIntegralNumber) {
+      node.longValue()
+    } else if (node.isFloatingPointNumber) {
+      node.doubleValue()
+    } else {
+      node.asText()
+    }
+  }
 }
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/package.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/package.scala
new file mode 100644
index 0000000000..c110f9d814
--- /dev/null
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/package.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.core
+
+package object state {
+  type State = Map[String, Any]
+}
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
index 15949ef471..ae37def667 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
@@ -72,6 +72,7 @@ object DocumentFactory {
           case RESULT             => StorageConfig.icebergTableResultNamespace
           case CONSOLE_MESSAGES   => 
StorageConfig.icebergTableConsoleMessagesNamespace
           case RUNTIME_STATISTICS => 
StorageConfig.icebergTableRuntimeStatisticsNamespace
+          case STATE              => "state"
           case _ =>
             throw new IllegalArgumentException(s"Resource type $resourceType 
is not supported")
         }
@@ -119,6 +120,7 @@ object DocumentFactory {
           case RESULT             => StorageConfig.icebergTableResultNamespace
           case CONSOLE_MESSAGES   => 
StorageConfig.icebergTableConsoleMessagesNamespace
           case RUNTIME_STATISTICS => 
StorageConfig.icebergTableRuntimeStatisticsNamespace
+          case STATE              => "state"
           case _ =>
             throw new IllegalArgumentException(s"Resource type $resourceType 
is not supported")
         }
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
index 3513ac5ecd..990776a69f 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
@@ -34,6 +34,7 @@ object VFSResourceType extends Enumeration {
   val RESULT: Value = Value("result")
   val RUNTIME_STATISTICS: Value = Value("runtimeStatistics")
   val CONSOLE_MESSAGES: Value = Value("consoleMessages")
+  val STATE: Value = Value("state")
 }
 
 object VFSURIFactory {
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/ifStatement/IfOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/ifStatement/IfOpExec.scala
index 462bdd0969..d2becc79a5 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/ifStatement/IfOpExec.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/ifStatement/IfOpExec.scala
@@ -34,7 +34,7 @@ class IfOpExec(descString: String) extends OperatorExecutor {
   //It can accept any value that can be converted to a boolean. For example, 
Int 1 will be converted to true.
   override def processState(state: State, port: Int): Option[State] = {
     outputPort =
-      if (state.get(desc.conditionName).asInstanceOf[Boolean]) PortIdentity(1) 
else PortIdentity()
+      if (state(desc.conditionName).asInstanceOf[Boolean]) PortIdentity(1) 
else PortIdentity()
     Some(state)
   }
 


Reply via email to