This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
     new 007a264b59 refactor(amber): carry loop_counter off State on the 
StateFrame envelope
007a264b59 is described below

commit 007a264b59421f4066259b8ee2f36e1a132262d2
Author: Xinyuan Lin <[email protected]>
AuthorDate: Mon Jun 1 16:01:27 2026 -0700

    refactor(amber): carry loop_counter off State on the StateFrame envelope
    
    The loop operators mutated the runtime-owned State dict they were handed 
(`state["loop_counter"] += 1` / `-= 1`), which a reviewer flagged: loop_counter 
is loop-control bookkeeping, not user state, and the runtime -- not the 
operator -- owns that dict.
    
    Move loop_counter out of State entirely. It now rides on the StateFrame 
transport envelope and is owned by the worker runtime: 
main_loop._process_state_frame applies the +1/-1 and handles the 
LoopStart/LoopEnd nested pass-through, skipping the operator. It is 
materialized/serialized as its own column via a new bilingual StateStorage 
("content" STRING, "loop_counter" LONG) format, so State and its single-column 
schema stay pure user content. The generated LoopEnd process_state is now c 
[...]
    
    Operators never see or mutate loop_counter. Adds StateStorage (Python + 
Scala) and StateStorageSpec, and relocates the counter increment/decrement 
coverage from operator-level tests to main_loop runtime tests.
    
    Co-Authored-By: Claude Opus 4.8 <[email protected]>
---
 .../core/architecture/packaging/input_manager.py   |   6 +-
 .../core/architecture/packaging/output_manager.py  |  16 +-
 amber/src/main/python/core/models/__init__.py      |   2 +
 amber/src/main/python/core/models/operator.py      |   8 +-
 amber/src/main/python/core/models/payload.py       |   3 +
 amber/src/main/python/core/models/state_storage.py |  56 +++++++
 amber/src/main/python/core/runnables/main_loop.py  |  61 ++++++--
 .../main/python/core/runnables/network_receiver.py |   6 +-
 .../main/python/core/runnables/network_sender.py   |  15 +-
 .../input_port_materialization_reader_runnable.py  |  12 +-
 .../messaginglayer/OutputManager.scala             |   4 +-
 .../pythonworker/PythonProxyClient.scala           |   3 +-
 .../scheduling/RegionExecutionCoordinator.scala    |   4 +-
 .../architecture/packaging/test_output_manager.py  |  10 +-
 .../packaging/test_state_materialization_e2e.py    |  33 ++--
 .../test/python/core/models/test_loop_operators.py | 172 ++-------------------
 .../test/python/core/runnables/test_main_loop.py   | 133 ++++++++++++++--
 ...t_input_port_materialization_reader_runnable.py |  18 ++-
 .../texera/amber/core/state/StateStorage.scala     |  56 +++++++
 .../texera/amber/core/state/StateStorageSpec.scala |  51 ++++++
 .../texera/amber/operator/loop/LoopEndOpDesc.scala |   4 -
 .../amber/operator/loop/LoopStartOpDesc.scala      |   2 +-
 .../amber/operator/loop/LoopEndOpDescSpec.scala    |  18 +--
 .../amber/operator/loop/LoopStartOpDescSpec.scala  |   5 +-
 24 files changed, 444 insertions(+), 254 deletions(-)

diff --git a/amber/src/main/python/core/architecture/packaging/input_manager.py 
b/amber/src/main/python/core/architecture/packaging/input_manager.py
index 6cb6bdc08c..22b7fe0286 100644
--- a/amber/src/main/python/core/architecture/packaging/input_manager.py
+++ b/amber/src/main/python/core/architecture/packaging/input_manager.py
@@ -155,13 +155,15 @@ class InputManager:
 
     def process_data_payload(
         self, from_: ChannelIdentity, payload: DataPayload
-    ) -> Iterator[Union[Tuple, InternalMarker]]:
+    ) -> Iterator[Union[Tuple, StateFrame, InternalMarker]]:
         self._current_channel_id = from_
 
         if isinstance(payload, DataFrame):
             yield from self._process_data(payload.frame)
         elif isinstance(payload, StateFrame):
-            yield payload.frame
+            # Yield the whole envelope (not just .frame) so the runtime can
+            # read its loop_counter; the operator still receives a bare State.
+            yield payload
         else:
             raise NotImplementedError()
 
diff --git 
a/amber/src/main/python/core/architecture/packaging/output_manager.py 
b/amber/src/main/python/core/architecture/packaging/output_manager.py
index 7ef0ca804a..f2cb502c07 100644
--- a/amber/src/main/python/core/architecture/packaging/output_manager.py
+++ b/amber/src/main/python/core/architecture/packaging/output_manager.py
@@ -41,7 +41,7 @@ from 
core.architecture.sendsemantics.range_based_shuffle_partitioner import (
 from core.architecture.sendsemantics.round_robin_partitioner import (
     RoundRobinPartitioner,
 )
-from core.models import Tuple, Schema, StateFrame
+from core.models import Tuple, Schema, StateFrame, StateStorage
 from core.models.payload import DataPayload, DataFrame
 from core.models.state import State
 from core.storage.document_factory import DocumentFactory
@@ -211,14 +211,18 @@ class OutputManager:
                 PortStorageWriterElement(data_tuple=tuple_)
             )
 
-    def save_state_to_storage_if_needed(self, state: State, port_id=None) -> 
None:
+    def save_state_to_storage_if_needed(
+        self, state: State, loop_counter: int, port_id=None
+    ) -> None:
         # When port_id is omitted the same state row is fanned out to
         # every output port's state table. This mirrors the
         # broadcast-to-all-workers behavior on the emit side: state is
         # shared context, not per-key data, so every downstream operator
         # (and every worker reading the materialization) needs the full
         # set.
-        element = PortStorageWriterElement(data_tuple=state.to_tuple())
+        element = PortStorageWriterElement(
+            data_tuple=StateStorage.to_tuple(state, loop_counter)
+        )
         if port_id is None:
             for writer_queue, _, _ in self._port_state_writers.values():
                 writer_queue.put(element)
@@ -234,7 +238,7 @@ class OutputManager:
             self._ports[port_id].get_schema(),
         )
         DocumentFactory.create_document(
-            VFSURIFactory.state_uri(storage_uri_base), State.SCHEMA
+            VFSURIFactory.state_uri(storage_uri_base), StateStorage.SCHEMA
         )
         self.set_up_port_storage_writer(port_id, storage_uri_base)
 
@@ -311,7 +315,7 @@ class OutputManager:
         )
 
     def emit_state(
-        self, state: State
+        self, state: State, loop_counter: int
     ) -> Iterable[typing.Tuple[ActorVirtualIdentity, DataPayload]]:
         return chain(
             *(
@@ -319,7 +323,7 @@ class OutputManager:
                     (
                         receiver,
                         (
-                            StateFrame(payload)
+                            StateFrame(payload, loop_counter=loop_counter)
                             if isinstance(payload, State)
                             else self.tuple_to_frame(payload)
                         ),
diff --git a/amber/src/main/python/core/models/__init__.py 
b/amber/src/main/python/core/models/__init__.py
index d24fe0a277..870944982e 100644
--- a/amber/src/main/python/core/models/__init__.py
+++ b/amber/src/main/python/core/models/__init__.py
@@ -26,6 +26,7 @@ from .table import Table, TableLike
 from .batch import Batch, BatchLike
 from .schema import AttributeType, Field, Schema
 from .state import State
+from .state_storage import StateStorage
 from .operator import (
     Operator,
     TableOperator,
@@ -65,4 +66,5 @@ __all__ = [
     "Field",
     "Schema",
     "State",
+    "StateStorage",
 ]
diff --git a/amber/src/main/python/core/models/operator.py 
b/amber/src/main/python/core/models/operator.py
index b51e284c55..8781216ea0 100644
--- a/amber/src/main/python/core/models/operator.py
+++ b/amber/src/main/python/core/models/operator.py
@@ -296,9 +296,11 @@ class TableOperator(TupleOperatorV2):
 class LoopStartOperator(TableOperator):
     @overrides.final
     def process_state(self, state: State, port: int) -> Optional[State]:
-        if "LoopStartStateURI" in state:
-            state["loop_counter"] += 1
-            return state
+        # First-entry only: merge upstream state into self.state. The nested
+        # pass-through (state already carrying LoopStartStateURI) and all
+        # loop_counter bookkeeping are owned by the worker runtime
+        # (main_loop._process_state_frame), so this operator never sees the
+        # counter and never mutates the State it is handed.
         self.state.update(state)
         return None
 
diff --git a/amber/src/main/python/core/models/payload.py 
b/amber/src/main/python/core/models/payload.py
index 61a3329488..c2de13c712 100644
--- a/amber/src/main/python/core/models/payload.py
+++ b/amber/src/main/python/core/models/payload.py
@@ -34,3 +34,6 @@ class DataFrame(DataPayload):
 @dataclass
 class StateFrame(DataPayload):
     frame: State
+    # Loop-control bookkeeping owned by the worker runtime, carried alongside
+    # the State payload (not inside it). Defaults to 0 for all non-loop state.
+    loop_counter: int = 0
diff --git a/amber/src/main/python/core/models/state_storage.py 
b/amber/src/main/python/core/models/state_storage.py
new file mode 100644
index 0000000000..f880f03583
--- /dev/null
+++ b/amber/src/main/python/core/models/state_storage.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from .schema import Schema
+from .state import State
+from .tuple import Tuple
+
+
+class StateStorage:
+    """Two-column wire/storage format for a ``State`` and its ``loop_counter``.
+
+    ``loop_counter`` is loop-control bookkeeping owned by the worker runtime,
+    not part of the user ``State``. In memory it rides on the ``StateFrame``
+    envelope; whenever state is serialized (network) or materialized (the state
+    storage table) it is written as its own ``loop_counter`` column parallel to
+    ``content`` so it never enters the user state JSON.
+
+    This is the single source of truth for the two-column layout. The Scala
+    ``StateStorage`` object must stay byte-for-byte in sync (same column names,
+    order, and types), since the same state table is written/read by both.
+    """
+
+    CONTENT = "content"
+    LOOP_COUNTER = "loop_counter"
+    SCHEMA = Schema(raw_schema={CONTENT: "STRING", LOOP_COUNTER: "LONG"})
+
+    @staticmethod
+    def to_tuple(state: State, loop_counter: int) -> Tuple:
+        return Tuple(
+            {
+                StateStorage.CONTENT: state.to_json(),
+                StateStorage.LOOP_COUNTER: int(loop_counter),
+            },
+            schema=StateStorage.SCHEMA,
+        )
+
+    @staticmethod
+    def from_tuple(row: Tuple) -> "tuple[State, int]":
+        return (
+            State.from_json(row[StateStorage.CONTENT]),
+            int(row[StateStorage.LOOP_COUNTER]),
+        )
diff --git a/amber/src/main/python/core/runnables/main_loop.py 
b/amber/src/main/python/core/runnables/main_loop.py
index 4e3a56d8a1..19fe12d6dc 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -29,6 +29,8 @@ from core.architecture.rpc.async_rpc_client import 
AsyncRPCClient
 from core.architecture.rpc.async_rpc_server import AsyncRPCServer
 from core.models import (
     InternalQueue,
+    StateFrame,
+    StateStorage,
     Tuple,
 )
 from core.models.internal_marker import StartChannel, EndChannel
@@ -119,8 +121,10 @@ class MainLoop(StoppableQueueBlockingRunnable):
         # user-visible loop state is written back to LoopStart's input.
         for key in ("table", "output", "LoopStartId", "LoopStartStateURI"):
             state.pop(key, None)
-        writer = DocumentFactory.create_document(uri, State.SCHEMA).writer("0")
-        writer.put_one(State(state).to_tuple())
+        writer = DocumentFactory.create_document(uri, 
StateStorage.SCHEMA).writer("0")
+        # The back-edge fires only after the matching LoopEnd consumed at
+        # loop_counter == 0, so the next iteration's input starts at depth 0.
+        writer.put_one(StateStorage.to_tuple(State(state), 0))
         writer.close()
 
     def complete(self) -> None:
@@ -228,7 +232,7 @@ class MainLoop(StoppableQueueBlockingRunnable):
                     output_tuple
                 )
 
-    def process_input_state(self) -> None:
+    def process_input_state(self, output_loop_counter: int = 0) -> None:
         self._switch_context()
         output_state = self.context.state_processing_manager.get_output_state()
         if output_state is not None:
@@ -237,16 +241,19 @@ class MainLoop(StoppableQueueBlockingRunnable):
                 self.context.output_manager.reset_storage()
             elif isinstance(executor, LoopStartOperator):
                 self._attach_loop_start_id(output_state)
-            for to, batch in 
self.context.output_manager.emit_state(output_state):
-                self._output_queue.put(
-                    DataElement(
-                        tag=ChannelIdentity(
-                            ActorVirtualIdentity(self.context.worker_id), to, 
False
-                        ),
-                        payload=batch,
-                    )
+            self._emit_and_save_state(output_state, output_loop_counter)
+
+    def _emit_and_save_state(self, state: State, loop_counter: int) -> None:
+        for to, batch in self.context.output_manager.emit_state(state, 
loop_counter):
+            self._output_queue.put(
+                DataElement(
+                    tag=ChannelIdentity(
+                        ActorVirtualIdentity(self.context.worker_id), to, False
+                    ),
+                    payload=batch,
                 )
-            
self.context.output_manager.save_state_to_storage_if_needed(output_state)
+            )
+        self.context.output_manager.save_state_to_storage_if_needed(state, 
loop_counter)
 
     def process_tuple_with_udf(self) -> Iterator[Optional[Tuple]]:
         """
@@ -289,9 +296,29 @@ class MainLoop(StoppableQueueBlockingRunnable):
         self.process_input_tuple()
         self._check_and_process_control()
 
-    def _process_state(self, state_: State) -> None:
-        self.context.state_processing_manager.current_input_state = state_
-        self.process_input_state()
+    def _process_state_frame(self, frame: StateFrame) -> None:
+        # The runtime owns loop_counter; loop operators never see or mutate it.
+        # The LoopStart/LoopEnd nested pass-through branches are handled here 
--
+        # forwarding the state and skipping the operator -- so the operator's
+        # process_state only ever runs the first-entry / consume path.
+        state = frame.frame
+        in_counter = frame.loop_counter
+        executor = self.context.executor_manager.executor
+
+        if isinstance(executor, LoopEndOperator) and in_counter > 0:
+            # State belongs to an outer loop: step one level out and forward.
+            self._emit_and_save_state(state, in_counter - 1)
+            self._check_and_process_control()
+            return
+        if isinstance(executor, LoopStartOperator) and "LoopStartStateURI" in 
state:
+            # Outer loop's state flowing through an inner LoopStart: step one
+            # level deeper and forward.
+            self._emit_and_save_state(state, in_counter + 1)
+            self._check_and_process_control()
+            return
+
+        self.context.state_processing_manager.current_input_state = state
+        self.process_input_state(output_loop_counter=in_counter)
         self._check_and_process_control()
 
     def _process_start_channel(self) -> None:
@@ -460,8 +487,8 @@ class MainLoop(StoppableQueueBlockingRunnable):
                     element,
                     Tuple,
                     self._process_tuple,
-                    State,
-                    self._process_state,
+                    StateFrame,
+                    self._process_state_frame,
                 )
             except Exception as err:
                 logger.exception(err)
diff --git a/amber/src/main/python/core/runnables/network_receiver.py 
b/amber/src/main/python/core/runnables/network_receiver.py
index 8ba4fbe147..2ddf493926 100644
--- a/amber/src/main/python/core/runnables/network_receiver.py
+++ b/amber/src/main/python/core/runnables/network_receiver.py
@@ -34,6 +34,7 @@ from core.models import (
     DataFrame,
     State,
     StateFrame,
+    StateStorage,
 )
 from core.models.internal_queue import (
     DataElement,
@@ -96,7 +97,10 @@ class NetworkReceiver(Runnable, Stoppable):
                 "Data",
                 lambda _: DataFrame(table),
                 "State",
-                lambda _: 
StateFrame(State.from_json(table[State.CONTENT][0].as_py())),
+                lambda _: StateFrame(
+                    State.from_json(table[StateStorage.CONTENT][0].as_py()),
+                    
loop_counter=int(table[StateStorage.LOOP_COUNTER][0].as_py()),
+                ),
                 "ECM",
                 lambda _: 
EmbeddedControlMessage().parse(table["payload"][0].as_py()),
             )
diff --git a/amber/src/main/python/core/runnables/network_sender.py 
b/amber/src/main/python/core/runnables/network_sender.py
index d8e3889ac1..16a4e5c7a5 100644
--- a/amber/src/main/python/core/runnables/network_sender.py
+++ b/amber/src/main/python/core/runnables/network_sender.py
@@ -20,7 +20,13 @@ from loguru import logger
 from overrides import overrides
 from typing import Optional
 
-from core.models import DataPayload, InternalQueue, DataFrame, State, 
StateFrame
+from core.models import (
+    DataPayload,
+    InternalQueue,
+    DataFrame,
+    StateFrame,
+    StateStorage,
+)
 from core.models.internal_queue import (
     InternalQueueElement,
     DataElement,
@@ -100,8 +106,11 @@ class NetworkSender(StoppableQueueBlockingRunnable):
         elif isinstance(data_payload, StateFrame):
             data_header = PythonDataHeader(tag=to, payload_type="State")
             table = pa.Table.from_pydict(
-                {State.CONTENT: [data_payload.frame.to_json()]},
-                schema=State.SCHEMA.as_arrow_schema(),
+                {
+                    StateStorage.CONTENT: [data_payload.frame.to_json()],
+                    StateStorage.LOOP_COUNTER: 
[int(data_payload.loop_counter)],
+                },
+                schema=StateStorage.SCHEMA.as_arrow_schema(),
             )
             self._proxy_client.send_data(bytes(data_header), table)
         else:
diff --git 
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
 
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
index 3e0e2d48ab..04390e6d2d 100644
--- 
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
+++ 
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
@@ -34,7 +34,14 @@ from 
core.architecture.sendsemantics.range_based_shuffle_partitioner import (
 from core.architecture.sendsemantics.round_robin_partitioner import (
     RoundRobinPartitioner,
 )
-from core.models import Tuple, InternalQueue, DataFrame, DataPayload, State, 
StateFrame
+from core.models import (
+    Tuple,
+    InternalQueue,
+    DataFrame,
+    DataPayload,
+    StateFrame,
+    StateStorage,
+)
 from core.models.internal_queue import DataElement, ECMElement
 from core.storage.document_factory import DocumentFactory
 from core.storage.vfs_uri_factory import VFSURIFactory
@@ -152,7 +159,8 @@ class InputPortMaterializationReaderRunnable(Runnable, 
Stoppable):
                 VFSURIFactory.state_uri(self.uri)
             )
             for state_row in state_document.get():
-                self.emit_payload(StateFrame(State.from_tuple(state_row)))
+                state, loop_counter = StateStorage.from_tuple(state_row)
+                self.emit_payload(StateFrame(state, loop_counter=loop_counter))
 
             storage_iterator = self.materialization.get()
             # Iterate and process tuples.
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
index 030fa3a3bb..12d48987f8 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
@@ -19,7 +19,7 @@
 
 package org.apache.texera.amber.engine.architecture.messaginglayer
 
-import org.apache.texera.amber.core.state.State
+import org.apache.texera.amber.core.state.{State, StateStorage}
 import org.apache.texera.amber.core.storage.{DocumentFactory, VFSURIFactory}
 import org.apache.texera.amber.core.storage.model.BufferedItemWriter
 import org.apache.texera.amber.core.tuple._
@@ -242,7 +242,7 @@ class OutputManager(
     // emit side: state is shared context, not per-key data, so every
     // downstream operator (and every worker reading the materialization)
     // needs the full set.
-    stateWriterThreads.values.foreach(_.queue.put(Left(state.toTuple)))
+    
stateWriterThreads.values.foreach(_.queue.put(Left(StateStorage.toTuple(state, 
0L))))
   }
 
   /**
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala
index 6618e857b1..3f56ddfd30 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala
@@ -36,6 +36,7 @@ import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
 import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.ReturnInvocation
 import org.apache.texera.amber.engine.common.AmberLogging
 import org.apache.texera.amber.engine.common.actormessage.{ActorCommand, 
PythonActorMessage}
+import org.apache.texera.amber.core.state.StateStorage
 import org.apache.texera.amber.engine.common.ambermessage._
 import org.apache.texera.amber.util.ArrowUtils
 import org.apache.arrow.flight._
@@ -125,7 +126,7 @@ class PythonProxyClient(portNumberPromise: Promise[Int], 
val actorId: ActorVirtu
       case DataFrame(frame) =>
         writeArrowStream(mutable.Queue(ArraySeq.unsafeWrapArray(frame): _*), 
from, "Data")
       case StateFrame(state) =>
-        writeArrowStream(mutable.Queue(state.toTuple), from, "State")
+        writeArrowStream(mutable.Queue(StateStorage.toTuple(state, 0L)), from, 
"State")
     }
   }
 
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 6d083fa6d6..9ccc0f62e7 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -21,7 +21,7 @@ package org.apache.texera.amber.engine.architecture.scheduling
 
 import org.apache.pekko.pattern.gracefulStop
 import com.twitter.util.{Duration => TwitterDuration, Future, JavaTimer, 
Return, Throw, Timer}
-import org.apache.texera.amber.core.state.State
+import org.apache.texera.amber.core.state.StateStorage
 import org.apache.texera.amber.core.storage.{DocumentFactory, VFSURIFactory}
 import org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI
 import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
@@ -585,7 +585,7 @@ class RegionExecutionCoordinator(
           DocumentFactory.createDocument(resultURI, schema)
         }
         if (!isLoopEndRegion || !DocumentFactory.documentExists(stateURI)) {
-          DocumentFactory.createDocument(stateURI, State.schema)
+          DocumentFactory.createDocument(stateURI, StateStorage.schema)
         }
         if (!isRestart) {
           val (_, eid, _, _) = decodeURI(resultURI)
diff --git 
a/amber/src/test/python/core/architecture/packaging/test_output_manager.py 
b/amber/src/test/python/core/architecture/packaging/test_output_manager.py
index dcf7ccde67..95e03dca63 100644
--- a/amber/src/test/python/core/architecture/packaging/test_output_manager.py
+++ b/amber/src/test/python/core/architecture/packaging/test_output_manager.py
@@ -49,15 +49,15 @@ class TestSaveStateToStorageIfNeeded:
 
     @pytest.fixture
     def state(self):
-        return State({"loop_counter": 1, "i": 2})
+        return State({"i": 2})
 
     def test_no_state_writers_is_a_noop(self, output_manager, state):
         # With no port set up, save_state_to_storage_if_needed must not
         # touch any writer.
-        output_manager.save_state_to_storage_if_needed(state)  # no-op
+        output_manager.save_state_to_storage_if_needed(state, 0)  # no-op
 
     def test_unknown_port_id_is_a_noop(self, output_manager, state, port_a):
-        output_manager.save_state_to_storage_if_needed(state, port_id=port_a)
+        output_manager.save_state_to_storage_if_needed(state, 0, 
port_id=port_a)
         # No assertion needed -- the absence of any writer means nothing
         # was attempted.
 
@@ -67,7 +67,7 @@ class TestSaveStateToStorageIfNeeded:
         queue_a, _, _ = _stub_state_writer(output_manager, port_a)
         queue_b, _, _ = _stub_state_writer(output_manager, port_b)
 
-        output_manager.save_state_to_storage_if_needed(state)
+        output_manager.save_state_to_storage_if_needed(state, 0)
 
         # Each port's writer queue receives one PortStorageWriterElement.
         # Critically, save is non-blocking -- the call must not invoke
@@ -84,7 +84,7 @@ class TestSaveStateToStorageIfNeeded:
         queue_a, _, _ = _stub_state_writer(output_manager, port_a)
         queue_b, _, _ = _stub_state_writer(output_manager, port_b)
 
-        output_manager.save_state_to_storage_if_needed(state, port_id=port_a)
+        output_manager.save_state_to_storage_if_needed(state, 0, 
port_id=port_a)
 
         assert queue_a.put.call_count == 1
         queue_b.put.assert_not_called()
diff --git 
a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
 
b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
index 9d1fd30698..eb2a96b1e7 100644
--- 
a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
+++ 
b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
@@ -43,7 +43,7 @@ import pytest
 from pyiceberg.catalog.sql import SqlCatalog
 
 from core.architecture.packaging.output_manager import OutputManager
-from core.models import State, StateFrame
+from core.models import State, StateFrame, StateStorage
 from core.models.internal_queue import DataElement, InternalQueue
 from core.storage.document_factory import DocumentFactory
 from core.storage.iceberg.iceberg_catalog_instance import 
IcebergCatalogInstance
@@ -161,7 +161,9 @@ def 
test_state_written_by_output_manager_is_replayed_by_reader():
     DocumentFactory.create_document(
         VFSURIFactory.result_uri(base_uri), worker_schema_for_result
     )
-    DocumentFactory.create_document(VFSURIFactory.state_uri(base_uri), 
State.SCHEMA)
+    DocumentFactory.create_document(
+        VFSURIFactory.state_uri(base_uri), StateStorage.SCHEMA
+    )
 
     # 2. Producer side: spin up an OutputManager, set up real state +
     # result writer threads against the iceberg storage.
@@ -170,9 +172,10 @@ def 
test_state_written_by_output_manager_is_replayed_by_reader():
         port_id, schema=worker_schema_for_result, storage_uri_base=base_uri
     )
 
-    # 3. Drive a state through the producer-side path.
-    state = State({"flag": True, "loop_counter": 7, "name": "outer"})
-    producer.save_state_to_storage_if_needed(state)
+    # 3. Drive a state through the producer-side path. loop_counter rides
+    # alongside the State (not inside it) and is materialized as its own 
column.
+    state = State({"flag": True, "name": "outer"})
+    producer.save_state_to_storage_if_needed(state, 7)
 
     # 4. Force the writer threads to flush + commit by closing them.
     # Without this, the iceberg buffer holds the state in memory and
@@ -213,19 +216,23 @@ def 
test_state_written_by_output_manager_is_replayed_by_reader():
     assert reader.finished(), "reader exited but did not mark itself finished"
 
     # 6. Drain the consumer's queue and find the StateFrame(s).
-    state_frames: list[State] = []
+    state_frames: list[StateFrame] = []
     while not consumer_queue.is_empty():
         elem = consumer_queue.get()
         if isinstance(elem, DataElement) and isinstance(elem.payload, 
StateFrame):
-            state_frames.append(elem.payload.frame)
+            state_frames.append(elem.payload)
 
     assert len(state_frames) == 1, (
         f"expected exactly one State to flow through writer→iceberg→reader; "
         f"got {len(state_frames)}: {state_frames}"
     )
-    assert state_frames[0] == state, (
+    assert state_frames[0].frame == state, (
         f"replayed state did not match what was written; "
-        f"wrote={state}, read={state_frames[0]}"
+        f"wrote={state}, read={state_frames[0].frame}"
+    )
+    assert state_frames[0].loop_counter == 7, (
+        f"loop_counter must round-trip through its own column; "
+        f"got {state_frames[0].loop_counter}"
     )
 
 
@@ -239,13 +246,15 @@ def test_state_table_persists_across_writer_close():
     port_id = PortIdentity(id=0, internal=False)
 
     DocumentFactory.create_document(VFSURIFactory.result_uri(base_uri), 
State.SCHEMA)
-    DocumentFactory.create_document(VFSURIFactory.state_uri(base_uri), 
State.SCHEMA)
+    DocumentFactory.create_document(
+        VFSURIFactory.state_uri(base_uri), StateStorage.SCHEMA
+    )
 
     producer = OutputManager(worker_id="Worker:WF0-test-producer2-main-0")
     producer.add_output_port(port_id, schema=State.SCHEMA, 
storage_uri_base=base_uri)
 
     state = State({"flag": False, "checkpoint": 42})
-    producer.save_state_to_storage_if_needed(state)
+    producer.save_state_to_storage_if_needed(state, 0)
     producer.close_port_storage_writers()
 
     # Read directly from the iceberg state document, bypassing the reader.
@@ -255,4 +264,4 @@ def test_state_table_persists_across_writer_close():
         f"expected exactly one row in the iceberg state table after the "
         f"writer was closed; got {len(rows)} rows"
     )
-    assert State.from_tuple(rows[0]) == state
+    assert StateStorage.from_tuple(rows[0]) == (state, 0)
diff --git a/amber/src/test/python/core/models/test_loop_operators.py 
b/amber/src/test/python/core/models/test_loop_operators.py
index a7d67b421b..edbc029ad7 100644
--- a/amber/src/test/python/core/models/test_loop_operators.py
+++ b/amber/src/test/python/core/models/test_loop_operators.py
@@ -71,7 +71,7 @@ class _StubLoopStart(LoopStartOperator):
         self._output_expr = output_expr
 
     def open(self) -> None:
-        self.state = {"loop_counter": 0}
+        self.state = {}
         exec(self._initialization, {}, self.state)
 
     def process_table(self, table: Table, port: int) -> 
Iterator[Optional[TableLike]]:
@@ -97,10 +97,9 @@ class _StubLoopEnd(LoopEndOperator):
         self.state = {}
 
     def process_state(self, state: State, port: int) -> Optional[State]:
-        loop_counter = int(state.get("loop_counter", 0))
-        if loop_counter > 0:
-            state["loop_counter"] = loop_counter - 1
-            return state
+        # Consume-only, mirroring the simplified codegen: the runtime owns
+        # loop_counter and the nested pass-through branch, so the operator only
+        # ever runs the matching-loop (consume) path.
         self.state = dict(state)
         self.state["table"] = loads(self.state["table"])
         exec(self._update, {}, self.state)
@@ -129,37 +128,10 @@ class TestLoopStartProcessState:
 
         assert result is None, "first-time state must not be forwarded"
         assert op.state["upstream_key"] == "v", "state was not merged into 
self.state"
-        # loop_counter is left at its open() value (0) on first entry.
-        assert op.state["loop_counter"] == 0
-
-    def test_reentry_state_with_loop_start_uri_increments_loop_counter(self):
-        # Re-entry from this LoopStart's own LoopEnd: the state carries
-        # LoopStartStateURI, so the base class must INCREMENT
-        # loop_counter and PASS THROUGH the state downstream. This is
-        # what main_loop's _attach_loop_start_id relies on.
-        op = _StubLoopStart()
-        op.open()
-        incoming = State({"LoopStartStateURI": "vfs:///x", "loop_counter": 0, 
"i": 2})
-
-        result = op.process_state(incoming, port=0)
-
-        assert result is not None, "re-entry state must be returned for 
downstream"
-        assert result["loop_counter"] == 1
-        # The user variable rides along.
-        assert result["i"] == 2
-
-    def test_reentry_at_nested_loop_counter_bumps_one(self):
-        # Nested loop: an outer loop's re-entry state passes through this
-        # inner LoopStart with a loop_counter already > 0 (because the
-        # outer LoopStart bumped it on its own re-entry first). The
-        # invariant is that we only ever +1, never reset.
-        op = _StubLoopStart()
-        op.open()
-        incoming = State({"LoopStartStateURI": "vfs:///outer", "loop_counter": 
5})
-
-        result = op.process_state(incoming, port=0)
 
-        assert result["loop_counter"] == 6
+    # NOTE: LoopStart re-entry (+1) is owned by the worker runtime now, not the
+    # operator (which only does the first-entry merge above). It and the nested
+    # pass-through are covered in test_main_loop.py::TestLoopCounterRuntime.
 
 
 # ---------------------------------------------------------------------------
@@ -202,7 +174,9 @@ class TestLoopStartProduceStateOnFinish:
 
         assert produced["i"] == 0
         assert produced["acc"] == []
-        assert produced["loop_counter"] == 0
+        # loop_counter is no longer seeded into the operator's state; it is
+        # runtime-owned and rides on the StateFrame envelope.
+        assert "loop_counter" not in produced
 
 
 # ---------------------------------------------------------------------------
@@ -303,126 +277,8 @@ class TestLoopEndMatchingBranch:
 
 
 # ---------------------------------------------------------------------------
-# Nested loops — LoopEnd pass-through branch
-# ---------------------------------------------------------------------------
-
-
-class TestLoopEndNestedPassThrough:
-    def test_loop_counter_positive_decrements_and_passes_state_through(self):
-        # When the inner LoopEnd receives state with loop_counter > 0,
-        # the state belongs to an OUTER loop. The inner LoopEnd must
-        # decrement loop_counter and return the state for downstream
-        # routing (which eventually reaches the outer LoopEnd at
-        # loop_counter == 0).
-        op = _StubLoopEnd(update="i += 1")
-        op.state = {"sentinel": "must_not_be_overwritten"}
-
-        incoming = State({"loop_counter": 2, "outer_var": "v"})
-        result = op.process_state(incoming, port=0)
-
-        assert result is not None, "pass-through branch must emit state 
downstream"
-        assert result["loop_counter"] == 1
-        assert result["outer_var"] == "v"
-        # The pass-through branch must NOT overwrite self.state — the
-        # inner LoopEnd's own matching-loop state from a previous inner
-        # iteration must be preserved.
-        assert op.state == {"sentinel": "must_not_be_overwritten"}
-
-    def test_pass_through_chain_collapses_to_matching_branch_at_zero(self):
-        # Walk a state with loop_counter=3 through three levels of
-        # nested LoopEnds: each strips one level, and the fourth
-        # (loop_counter == 0) is the matching loop that runs the
-        # user's update. This pins the depth-symmetric invariant
-        # nested-for-loop scheduling depends on.
-        from pickle import dumps
-
-        outer = _StubLoopEnd(update="i += 10")
-        middle = _StubLoopEnd(update="i += 100")
-        inner = _StubLoopEnd(update="i += 1000")
-        match = _StubLoopEnd(update="i += 1")
-
-        state = State(
-            {
-                "loop_counter": 3,
-                "i": 0,
-                "table": dumps(Table([Tuple({"v": 1})])),
-            }
-        )
-
-        # Each outer→inner hop decrements once.
-        state = outer.process_state(state, port=0)
-        assert state["loop_counter"] == 2
-        state = middle.process_state(state, port=0)
-        assert state["loop_counter"] == 1
-        state = inner.process_state(state, port=0)
-        assert state["loop_counter"] == 0
-        # At loop_counter == 0 the matching LoopEnd consumes the state
-        # and runs ITS user update — not any of the pass-through ops'.
-        result = match.process_state(state, port=0)
-        assert result is None
-        assert match.state["i"] == 1, "only the matching LoopEnd's update 
should fire"
-
-
-# ---------------------------------------------------------------------------
-# Nested loops — round trip
+# Nested-loop counter behaviour -- LoopStart +1, LoopEnd -1, and the
+# depth-symmetric invariant -- is now owned by the worker runtime (the
+# operators no longer read or mutate loop_counter), so it is covered in
+# test_main_loop.py::TestLoopCounterRuntime rather than here.
 # ---------------------------------------------------------------------------
-
-
-class TestNestedLoopRoundTrip:
-    def test_outer_then_inner_loop_state_keeps_counters_symmetric(self):
-        # Simulate the state flow for one outer iteration that itself
-        # triggers one inner iteration:
-        #
-        #   outer LoopStart re-entry  → loop_counter 0 → 1
-        #   inner LoopStart re-entry  → loop_counter 1 → 2
-        #   inner LoopEnd             → loop_counter 2 → 1
-        #   outer LoopEnd             → loop_counter 1 → 0 (matching branch)
-        #
-        # The matching branch on the outer LoopEnd is reached iff every
-        # increment is mirrored by exactly one decrement. A bug in
-        # either side would land us in the wrong branch.
-        outer_start = _StubLoopStart()
-        inner_start = _StubLoopStart()
-        inner_end = _StubLoopEnd(update="outer_i += 100")
-        outer_end = _StubLoopEnd(update="outer_i += 1")
-        outer_start.open()
-        inner_start.open()
-
-        from pickle import dumps
-
-        # outer LoopEnd jumped back to outer LoopStart with this state.
-        state_in = State(
-            {
-                "LoopStartStateURI": "vfs:///outer",
-                "loop_counter": 0,
-                "outer_i": 0,
-                "table": dumps(Table([Tuple({"v": 1})])),
-            }
-        )
-
-        # outer LoopStart re-entry: +1
-        state_after_outer_start = outer_start.process_state(state_in, port=0)
-        assert state_after_outer_start["loop_counter"] == 1
-        # inner LoopStart sees the same passing state and +1 again.
-        state_after_inner_start = inner_start.process_state(
-            state_after_outer_start, port=0
-        )
-        assert state_after_inner_start["loop_counter"] == 2
-        # inner LoopEnd: pass-through branch (-1).
-        state_after_inner_end = 
inner_end.process_state(state_after_inner_start, port=0)
-        assert state_after_inner_end is not None
-        assert state_after_inner_end["loop_counter"] == 1
-        # outer LoopEnd: pass-through (-1) lands at 0, the matching branch.
-        # Now process_state would have to run the matching branch path
-        # because loop_counter == 0. To get there we need one more hop:
-        result = outer_end.process_state(state_after_inner_end, port=0)
-        # NOT yet at 0 — pass-through decrements to 0 and returns. The
-        # NEXT hop is the matching branch.
-        assert result is not None
-        assert result["loop_counter"] == 0
-
-        # Final landing on the matching branch consumes the state and
-        # runs the outer update (outer_i += 1).
-        matching = _StubLoopEnd(update="outer_i += 1")
-        assert matching.process_state(result, port=0) is None
-        assert matching.state["outer_i"] == 1
diff --git a/amber/src/test/python/core/runnables/test_main_loop.py 
b/amber/src/test/python/core/runnables/test_main_loop.py
index c32f45b888..c7980fc3a6 100644
--- a/amber/src/test/python/core/runnables/test_main_loop.py
+++ b/amber/src/test/python/core/runnables/test_main_loop.py
@@ -36,6 +36,7 @@ from core.models.internal_queue import (
     DCMElement,
     ECMElement,
 )
+from core.models.operator import LoopEndOperator, LoopStartOperator
 from core.runnables import MainLoop
 from core.util import set_one_of
 from proto.org.apache.texera.amber.core import (
@@ -1150,7 +1151,9 @@ class TestMainLoop:
         monkeypatch.setattr(
             main_loop.context.output_manager,
             "emit_state",
-            lambda state: [(mock_data_output_channel.to_worker_id, 
StateFrame(state))],
+            lambda state, loop_counter: [
+                (mock_data_output_channel.to_worker_id, StateFrame(state))
+            ],
         )
 
         def fake_switch_context():
@@ -1167,8 +1170,8 @@ class TestMainLoop:
         first_state = State({"value": 1})
         second_state = State({"value": 41})
 
-        main_loop._process_state(first_state)
-        main_loop._process_state(second_state)
+        main_loop._process_state_frame(StateFrame(first_state))
+        main_loop._process_state_frame(StateFrame(second_state))
 
         first_output: DataElement = output_queue.get()
         second_output: DataElement = output_queue.get()
@@ -1360,7 +1363,9 @@ class TestMainLoop:
         monkeypatch.setattr(
             main_loop.context.output_manager,
             "emit_state",
-            lambda state: [(mock_data_output_channel.to_worker_id, 
StateFrame(state))],
+            lambda state, loop_counter: [
+                (mock_data_output_channel.to_worker_id, StateFrame(state))
+            ],
         )
 
         def fake_switch_context():
@@ -1377,8 +1382,8 @@ class TestMainLoop:
         first_state = State({"value": 1})
         second_state = State({"value": 41})
 
-        main_loop._process_state(first_state)
-        main_loop._process_state(second_state)
+        main_loop._process_state_frame(StateFrame(first_state))
+        main_loop._process_state_frame(StateFrame(second_state))
 
         first_output: DataElement = output_queue.get()
         second_output: DataElement = output_queue.get()
@@ -1414,12 +1419,14 @@ class TestMainLoop:
         monkeypatch.setattr(
             main_loop.context.output_manager,
             "emit_state",
-            lambda state: [(mock_data_output_channel.to_worker_id, 
StateFrame(state))],
+            lambda state, loop_counter: [
+                (mock_data_output_channel.to_worker_id, StateFrame(state))
+            ],
         )
         monkeypatch.setattr(
             main_loop.context.output_manager,
             "save_state_to_storage_if_needed",
-            lambda state: saved_states.append(state),
+            lambda state, loop_counter: saved_states.append(state),
         )
 
         def fake_switch_context():
@@ -1433,8 +1440,8 @@ class TestMainLoop:
 
         monkeypatch.setattr(main_loop, "_switch_context", fake_switch_context)
 
-        main_loop._process_state(State({"value": 1}))
-        main_loop._process_state(State({"value": 41}))
+        main_loop._process_state_frame(StateFrame(State({"value": 1})))
+        main_loop._process_state_frame(StateFrame(State({"value": 41})))
 
         # Each input state produced one output state, so both must have
         # been persisted in order.
@@ -1458,7 +1465,7 @@ class TestMainLoop:
         # → `process_input_state` → DataProcessor.process_internal_marker
         # (StartChannel) → executor.produce_state_on_start → _set_output_state
         # → MainLoop reads output state → emit + save.
-        on_start_state = State({"flag": True, "loop_counter": 0})
+        on_start_state = State({"flag": True})
 
         class DummyExecutor:
             @staticmethod
@@ -1473,12 +1480,14 @@ class TestMainLoop:
         monkeypatch.setattr(
             main_loop.context.output_manager,
             "emit_state",
-            lambda state: [(mock_data_output_channel.to_worker_id, 
StateFrame(state))],
+            lambda state, loop_counter: [
+                (mock_data_output_channel.to_worker_id, StateFrame(state))
+            ],
         )
         monkeypatch.setattr(
             main_loop.context.output_manager,
             "save_state_to_storage_if_needed",
-            lambda state: saved_states.append(state),
+            lambda state, loop_counter: saved_states.append(state),
         )
         # _send_ecm_to_data_channels touches output_manager state we don't
         # set up here; for this test the ECM forwarding is irrelevant -- the
@@ -1519,7 +1528,9 @@ class TestMainLoop:
             f"to storage. saved_states={saved_states}"
         )
         assert saved_states[0]["flag"] is True
-        assert saved_states[0]["loop_counter"] == 0
+        # loop_counter is no longer part of the user State; it rides on the
+        # StateFrame envelope / its own materialized column.
+        assert "loop_counter" not in saved_states[0]
         assert saved_states[0]["port"] == 0
 
     @pytest.mark.timeout(2)
@@ -1536,17 +1547,17 @@ class TestMainLoop:
         monkeypatch.setattr(
             main_loop.context.output_manager,
             "emit_state",
-            lambda state: [],
+            lambda state, loop_counter: [],
         )
         monkeypatch.setattr(
             main_loop.context.output_manager,
             "save_state_to_storage_if_needed",
-            lambda state: save_calls.append(state),
+            lambda state, loop_counter: save_calls.append(state),
         )
         # Pretend DataProc consumed the input but produced no output.
         monkeypatch.setattr(main_loop, "_switch_context", lambda: None)
 
-        main_loop._process_state(State({"value": 1}))
+        main_loop._process_state_frame(StateFrame(State({"value": 1})))
 
         assert save_calls == []
 
@@ -1777,3 +1788,91 @@ class TestMainLoop:
         assert events[0][1].msg_type == ConsoleMessageType.ERROR
         assert "boom-from-executor" in events[0][1].title
         assert events[1][1] is PauseType.EXCEPTION_PAUSE
+
+    # -- Loop counter is runtime-owned (relocated from test_loop_operators) ---
+    #
+    # loop_counter is not part of State; it rides on the StateFrame envelope 
and
+    # the runtime (_process_state_frame) owns the +1/-1. On the nested
+    # pass-through branches the operator must be skipped entirely.
+
+    def _capture_state_emit(self, main_loop, monkeypatch):
+        """Stub emit/save/switch; return (emitted, switched) recorders."""
+        emitted = []
+        switched = []
+        monkeypatch.setattr(main_loop, "_check_and_process_control", lambda: 
None)
+        monkeypatch.setattr(main_loop, "_switch_context", lambda: 
switched.append(True))
+        monkeypatch.setattr(
+            main_loop.context.output_manager,
+            "emit_state",
+            lambda state, loop_counter: emitted.append((state, loop_counter)) 
or [],
+        )
+        monkeypatch.setattr(
+            main_loop.context.output_manager,
+            "save_state_to_storage_if_needed",
+            lambda state, loop_counter: None,
+        )
+        return emitted, switched
+
+    def test_loopstart_reentry_increments_counter_and_skips_operator(
+        self, main_loop, monkeypatch
+    ):
+        # A state already carrying LoopStartStateURI is an outer loop's state
+        # passing through this inner LoopStart. The runtime forwards it with
+        # loop_counter + 1 and must NOT invoke the operator.
+        class StubLoopStart(LoopStartOperator):
+            def process_table(self, table, port):
+                yield
+
+        main_loop.context.executor_manager.executor = StubLoopStart()
+        emitted, switched = self._capture_state_emit(main_loop, monkeypatch)
+
+        main_loop._process_state_frame(
+            StateFrame(State({"LoopStartStateURI": "vfs:///x", "i": 5}), 
loop_counter=1)
+        )
+
+        assert switched == [], "nested pass-through must not invoke the 
operator"
+        assert len(emitted) == 1
+        emitted_state, emitted_counter = emitted[0]
+        assert emitted_counter == 2  # 1 + 1
+        assert emitted_state["i"] == 5
+        assert "loop_counter" not in emitted_state  # never leaks into State
+
+    def test_loopend_passthrough_decrements_counter_and_skips_operator(
+        self, main_loop, monkeypatch
+    ):
+        # loop_counter > 0 at a LoopEnd means the state belongs to an outer
+        # loop: the runtime decrements and forwards, skipping the operator.
+        class StubLoopEnd(LoopEndOperator):
+            def condition(self):
+                return False
+
+        main_loop.context.executor_manager.executor = StubLoopEnd()
+        emitted, switched = self._capture_state_emit(main_loop, monkeypatch)
+
+        main_loop._process_state_frame(
+            StateFrame(State({"outer_var": "v"}), loop_counter=2)
+        )
+
+        assert switched == [], "pass-through must not invoke the operator"
+        assert len(emitted) == 1
+        emitted_state, emitted_counter = emitted[0]
+        assert emitted_counter == 1  # 2 - 1
+        assert emitted_state["outer_var"] == "v"
+
+    def test_loopend_consume_invokes_operator_at_counter_zero(
+        self, main_loop, monkeypatch
+    ):
+        # loop_counter == 0 is the matching loop: the runtime runs the operator
+        # (consume) via the context switch. The operator returns None, so no
+        # state is emitted; the loop-back is driven by complete() separately.
+        class StubLoopEnd(LoopEndOperator):
+            def condition(self):
+                return False
+
+        main_loop.context.executor_manager.executor = StubLoopEnd()
+        emitted, switched = self._capture_state_emit(main_loop, monkeypatch)
+
+        main_loop._process_state_frame(StateFrame(State({"i": 0}), 
loop_counter=0))
+
+        assert switched == [True], "consume branch must invoke the operator"
+        assert emitted == [], "operator returned None -> nothing emitted"
diff --git 
a/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py
 
b/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py
index 5016c2df2f..f8d6e96417 100644
--- 
a/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py
+++ 
b/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py
@@ -19,7 +19,7 @@ from unittest.mock import MagicMock, patch
 
 import pytest
 
-from core.models import State, StateFrame
+from core.models import State, StateFrame, StateStorage
 from core.models.internal_queue import DataElement
 from core.models.schema import Schema
 from core.storage.runnables.input_port_materialization_reader_runnable import (
@@ -60,12 +60,13 @@ class TestRunStateReadingBlock:
         return instance
 
     def test_state_rows_are_emitted_as_state_frames(self, runnable):
-        state_a = State({"loop_counter": 0})
-        state_b = State({"loop_counter": 1})
+        state_a = State({"i": 0})
+        state_b = State({"i": 1})
 
-        # The state document yields opaque tuples; from_tuple deserializes
-        # them. Patch from_tuple so we don't have to wire a real
-        # serialization.
+        # The state document yields opaque 2-column tuples; StateStorage
+        # .from_tuple deserializes each into (State, loop_counter). Patch it
+        # so we don't have to wire a real serialization. The loop_counter
+        # must be carried onto the emitted StateFrame envelope.
         result_doc = MagicMock()
         result_doc.get.return_value = iter([])  # No materialized tuples.
         state_doc = MagicMock()
@@ -75,13 +76,13 @@ class TestRunStateReadingBlock:
             patch(
                 
"core.storage.runnables.input_port_materialization_reader_runnable.DocumentFactory"
             ) as mock_factory,
-            patch.object(State, "from_tuple") as mock_from_tuple,
+            patch.object(StateStorage, "from_tuple") as mock_from_tuple,
         ):
             mock_factory.open_document.side_effect = [
                 (result_doc, runnable.tuple_schema),
                 (state_doc, None),
             ]
-            mock_from_tuple.side_effect = [state_a, state_b]
+            mock_from_tuple.side_effect = [(state_a, 0), (state_b, 1)]
 
             runnable.run()
 
@@ -96,4 +97,5 @@ class TestRunStateReadingBlock:
             and isinstance(call.args[0].payload, StateFrame)
         ]
         assert [sf.payload.frame for sf in state_frames] == [state_a, state_b]
+        assert [sf.payload.loop_counter for sf in state_frames] == [0, 1]
         assert runnable._finished is True
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/StateStorage.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/StateStorage.scala
new file mode 100644
index 0000000000..541359a28c
--- /dev/null
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/StateStorage.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.core.state
+
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, 
Tuple}
+
+/**
+  * Two-column wire/storage format for a [[State]] plus its loop-control
+  * `loop_counter`.
+  *
+  * `loop_counter` is loop bookkeeping owned by the (Python) worker runtime and
+  * is intentionally NOT part of [[State]]. Whenever state is materialized (the
+  * state storage table) or sent over the wire it is written as its own
+  * `loop_counter` column parallel to `content`, so it never enters the user
+  * state JSON. Scala operators never produce a non-zero counter, so the Scala
+  * write paths emit `0`; this object exists so the bilingual state table 
schema
+  * and tuple layout stay byte-for-byte in sync with the Python `StateStorage`.
+  */
+object StateStorage {
+  val Content = "content"
+  val LoopCounter = "loop_counter"
+
+  val schema: Schema = new Schema(
+    new Attribute(Content, AttributeType.STRING),
+    new Attribute(LoopCounter, AttributeType.LONG)
+  )
+
+  def toTuple(state: State, loopCounter: Long): Tuple =
+    Tuple
+      .builder(schema)
+      .addSequentially(Array(state.toJson, Long.box(loopCounter)))
+      .build()
+
+  def fromTuple(row: Tuple): (State, Long) =
+    (
+      State.fromJson(row.getField[String](Content)),
+      row.getField[java.lang.Long](LoopCounter).toLong
+    )
+}
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateStorageSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateStorageSpec.scala
new file mode 100644
index 0000000000..2546646fc6
--- /dev/null
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateStorageSpec.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.core.state
+
+import org.scalatest.flatspec.AnyFlatSpec
+
+class StateStorageSpec extends AnyFlatSpec {
+
+  "StateStorage" should "round-trip a state and its loop_counter through a 
tuple" in {
+    val state = State(Map("i" -> 2L, "name" -> "outer"))
+    val tuple = StateStorage.toTuple(state, 3L)
+    val (decodedState, decodedCounter) = StateStorage.fromTuple(tuple)
+    assert(decodedState == state)
+    assert(decodedCounter == 3L)
+  }
+
+  it should "materialize loop_counter as its own column, never inside content" 
in {
+    // The whole point of the off-State design: loop_counter lives in a
+    // sibling column, so the user state JSON in `content` stays clean.
+    val state = State(Map("i" -> 7L))
+    val tuple = StateStorage.toTuple(state, 5L)
+    assert(tuple.getField[String]("content") == state.toJson)
+    assert(!tuple.getField[String]("content").contains("loop_counter"))
+    assert(tuple.getField[java.lang.Long]("loop_counter").toLong == 5L)
+  }
+
+  it should "default an absent counter round-trip to the written value" in {
+    val tuple = StateStorage.toTuple(State(Map.empty), 0L)
+    assert(tuple.getSchema == StateStorage.schema)
+    val (decodedState, decodedCounter) = StateStorage.fromTuple(tuple)
+    assert(decodedState == State(Map.empty))
+    assert(decodedCounter == 0L)
+  }
+}
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala
index c1a75b56d5..539a01c16d 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala
@@ -76,10 +76,6 @@ class LoopEndOpDesc extends LogicalOp {
        |class ProcessLoopEndOperator(LoopEndOperator):
        |    @overrides
        |    def process_state(self, state: State, port: int) -> 
Optional[State]:
-       |      loop_counter = int(state.get("loop_counter", 0))
-       |      if loop_counter > 0:
-       |        state["loop_counter"] = loop_counter - 1
-       |        return state
        |      self.state = dict(state)
        |      from pickle import loads
        |      self.state["table"] = loads(self.state["table"])
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala
index c6194632fe..8fb312f8fe 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala
@@ -75,7 +75,7 @@ class LoopStartOpDesc extends LogicalOp {
        |class ProcessLoopStartOperator(LoopStartOperator):
        |    @overrides
        |    def open(self):
-       |        self.state = {"loop_counter": 0}
+       |        self.state = {}
        |        exec($initialization, {}, self.state)
        |
        |    @overrides
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/loop/LoopEndOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/loop/LoopEndOpDescSpec.scala
index 0544385973..6e985dc1fa 100644
--- 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/loop/LoopEndOpDescSpec.scala
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/loop/LoopEndOpDescSpec.scala
@@ -75,16 +75,16 @@ class LoopEndOpDescSpec extends AnyFlatSpec with 
LoopOpDescSpecMixin {
     code should include("def condition(self) -> bool:")
   }
 
-  it should "decrement loop_counter and pass state through when loop_counter > 
0 (nested-loop case)" in {
-    // For nested loops the inner LoopEnd sees state belonging to an
-    // outer loop. The generated process_state recognises this by a
-    // positive loop_counter and just decrements + returns the state,
-    // leaving the actual loop-control work to the outer LoopEnd.
-    // Critical for nested-for-loop correctness -- pin its shape.
+  it should "generate a consume-only process_state with no loop_counter 
handling" in {
+    // loop_counter is owned by the worker runtime now, not the operator. The
+    // nested-loop pass-through (decrement + forward) happens in
+    // main_loop._process_state_frame before the operator is invoked, so the
+    // generated LoopEnd only ever runs the matching-loop (consume) path and
+    // must not read or mutate loop_counter. Pin the absence so a regression
+    // that re-introduces operator-side counter handling is caught.
     val code = desc().generatePythonCode()
-    code should include("loop_counter = int(state.get(\"loop_counter\", 0))")
-    code should include("if loop_counter > 0:")
-    code should include("state[\"loop_counter\"] = loop_counter - 1")
+    code should not include "loop_counter"
+    code should include("self.state = dict(state)")
   }
 
   it should "stash state, deserialize the pickled table, and run the decoded 
update on the matching-loop branch" in {
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/loop/LoopStartOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/loop/LoopStartOpDescSpec.scala
index 7fb9e41cf2..67f8446541 100644
--- 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/loop/LoopStartOpDescSpec.scala
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/loop/LoopStartOpDescSpec.scala
@@ -71,7 +71,10 @@ class LoopStartOpDescSpec extends AnyFlatSpec with 
LoopOpDescSpecMixin {
     // nothing.
     val code = desc(init = "i = 0", out = "table.iloc[i]").generatePythonCode()
     code should include("def open(self)")
-    code should include("\"loop_counter\": 0")
+    // loop_counter is runtime-owned now and must not be seeded into the
+    // operator's state; open() starts from an empty dict.
+    code should include("self.state = {}")
+    code should not include "loop_counter"
     code should include(s"exec(${decodeExpr("i = 0")}, {}, self.state)")
     code should include("def process_table(self, table: Table, port: int)")
     code should include(s"""exec("output = " + ${decodeExpr("table.iloc[i]")}, 
{}, self.state)""")

Reply via email to