This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
new 007a264b59 refactor(amber): carry loop_counter off State on the
StateFrame envelope
007a264b59 is described below
commit 007a264b59421f4066259b8ee2f36e1a132262d2
Author: Xinyuan Lin <[email protected]>
AuthorDate: Mon Jun 1 16:01:27 2026 -0700
refactor(amber): carry loop_counter off State on the StateFrame envelope
The loop operators mutated the runtime-owned State dict they were handed
(`state["loop_counter"] += 1` / `-= 1`), which a reviewer flagged: loop_counter
is loop-control bookkeeping, not user state, and the runtime -- not the
operator -- owns that dict.
Move loop_counter out of State entirely. It now rides on the StateFrame
transport envelope and is owned by the worker runtime:
main_loop._process_state_frame applies the +1/-1 and handles the
LoopStart/LoopEnd nested pass-through, skipping the operator. It is
materialized/serialized as its own column via a new bilingual StateStorage
("content" STRING, "loop_counter" LONG) format, so State and its single-column
schema stay pure user content. The generated LoopEnd process_state is now c
[...]
Operators never see or mutate loop_counter. Adds StateStorage (Python +
Scala) and StateStorageSpec, and relocates the counter increment/decrement
coverage from operator-level tests to main_loop runtime tests.
Co-Authored-By: Claude Opus 4.8 <[email protected]>
---
.../core/architecture/packaging/input_manager.py | 6 +-
.../core/architecture/packaging/output_manager.py | 16 +-
amber/src/main/python/core/models/__init__.py | 2 +
amber/src/main/python/core/models/operator.py | 8 +-
amber/src/main/python/core/models/payload.py | 3 +
amber/src/main/python/core/models/state_storage.py | 56 +++++++
amber/src/main/python/core/runnables/main_loop.py | 61 ++++++--
.../main/python/core/runnables/network_receiver.py | 6 +-
.../main/python/core/runnables/network_sender.py | 15 +-
.../input_port_materialization_reader_runnable.py | 12 +-
.../messaginglayer/OutputManager.scala | 4 +-
.../pythonworker/PythonProxyClient.scala | 3 +-
.../scheduling/RegionExecutionCoordinator.scala | 4 +-
.../architecture/packaging/test_output_manager.py | 10 +-
.../packaging/test_state_materialization_e2e.py | 33 ++--
.../test/python/core/models/test_loop_operators.py | 172 ++-------------------
.../test/python/core/runnables/test_main_loop.py | 133 ++++++++++++++--
...t_input_port_materialization_reader_runnable.py | 18 ++-
.../texera/amber/core/state/StateStorage.scala | 56 +++++++
.../texera/amber/core/state/StateStorageSpec.scala | 51 ++++++
.../texera/amber/operator/loop/LoopEndOpDesc.scala | 4 -
.../amber/operator/loop/LoopStartOpDesc.scala | 2 +-
.../amber/operator/loop/LoopEndOpDescSpec.scala | 18 +--
.../amber/operator/loop/LoopStartOpDescSpec.scala | 5 +-
24 files changed, 444 insertions(+), 254 deletions(-)
diff --git a/amber/src/main/python/core/architecture/packaging/input_manager.py
b/amber/src/main/python/core/architecture/packaging/input_manager.py
index 6cb6bdc08c..22b7fe0286 100644
--- a/amber/src/main/python/core/architecture/packaging/input_manager.py
+++ b/amber/src/main/python/core/architecture/packaging/input_manager.py
@@ -155,13 +155,15 @@ class InputManager:
def process_data_payload(
self, from_: ChannelIdentity, payload: DataPayload
- ) -> Iterator[Union[Tuple, InternalMarker]]:
+ ) -> Iterator[Union[Tuple, StateFrame, InternalMarker]]:
self._current_channel_id = from_
if isinstance(payload, DataFrame):
yield from self._process_data(payload.frame)
elif isinstance(payload, StateFrame):
- yield payload.frame
+ # Yield the whole envelope (not just .frame) so the runtime can
+ # read its loop_counter; the operator still receives a bare State.
+ yield payload
else:
raise NotImplementedError()
diff --git
a/amber/src/main/python/core/architecture/packaging/output_manager.py
b/amber/src/main/python/core/architecture/packaging/output_manager.py
index 7ef0ca804a..f2cb502c07 100644
--- a/amber/src/main/python/core/architecture/packaging/output_manager.py
+++ b/amber/src/main/python/core/architecture/packaging/output_manager.py
@@ -41,7 +41,7 @@ from
core.architecture.sendsemantics.range_based_shuffle_partitioner import (
from core.architecture.sendsemantics.round_robin_partitioner import (
RoundRobinPartitioner,
)
-from core.models import Tuple, Schema, StateFrame
+from core.models import Tuple, Schema, StateFrame, StateStorage
from core.models.payload import DataPayload, DataFrame
from core.models.state import State
from core.storage.document_factory import DocumentFactory
@@ -211,14 +211,18 @@ class OutputManager:
PortStorageWriterElement(data_tuple=tuple_)
)
- def save_state_to_storage_if_needed(self, state: State, port_id=None) ->
None:
+ def save_state_to_storage_if_needed(
+ self, state: State, loop_counter: int, port_id=None
+ ) -> None:
# When port_id is omitted the same state row is fanned out to
# every output port's state table. This mirrors the
# broadcast-to-all-workers behavior on the emit side: state is
# shared context, not per-key data, so every downstream operator
# (and every worker reading the materialization) needs the full
# set.
- element = PortStorageWriterElement(data_tuple=state.to_tuple())
+ element = PortStorageWriterElement(
+ data_tuple=StateStorage.to_tuple(state, loop_counter)
+ )
if port_id is None:
for writer_queue, _, _ in self._port_state_writers.values():
writer_queue.put(element)
@@ -234,7 +238,7 @@ class OutputManager:
self._ports[port_id].get_schema(),
)
DocumentFactory.create_document(
- VFSURIFactory.state_uri(storage_uri_base), State.SCHEMA
+ VFSURIFactory.state_uri(storage_uri_base), StateStorage.SCHEMA
)
self.set_up_port_storage_writer(port_id, storage_uri_base)
@@ -311,7 +315,7 @@ class OutputManager:
)
def emit_state(
- self, state: State
+ self, state: State, loop_counter: int
) -> Iterable[typing.Tuple[ActorVirtualIdentity, DataPayload]]:
return chain(
*(
@@ -319,7 +323,7 @@ class OutputManager:
(
receiver,
(
- StateFrame(payload)
+ StateFrame(payload, loop_counter=loop_counter)
if isinstance(payload, State)
else self.tuple_to_frame(payload)
),
diff --git a/amber/src/main/python/core/models/__init__.py
b/amber/src/main/python/core/models/__init__.py
index d24fe0a277..870944982e 100644
--- a/amber/src/main/python/core/models/__init__.py
+++ b/amber/src/main/python/core/models/__init__.py
@@ -26,6 +26,7 @@ from .table import Table, TableLike
from .batch import Batch, BatchLike
from .schema import AttributeType, Field, Schema
from .state import State
+from .state_storage import StateStorage
from .operator import (
Operator,
TableOperator,
@@ -65,4 +66,5 @@ __all__ = [
"Field",
"Schema",
"State",
+ "StateStorage",
]
diff --git a/amber/src/main/python/core/models/operator.py
b/amber/src/main/python/core/models/operator.py
index b51e284c55..8781216ea0 100644
--- a/amber/src/main/python/core/models/operator.py
+++ b/amber/src/main/python/core/models/operator.py
@@ -296,9 +296,11 @@ class TableOperator(TupleOperatorV2):
class LoopStartOperator(TableOperator):
@overrides.final
def process_state(self, state: State, port: int) -> Optional[State]:
- if "LoopStartStateURI" in state:
- state["loop_counter"] += 1
- return state
+ # First-entry only: merge upstream state into self.state. The nested
+ # pass-through (state already carrying LoopStartStateURI) and all
+ # loop_counter bookkeeping are owned by the worker runtime
+ # (main_loop._process_state_frame), so this operator never sees the
+ # counter and never mutates the State it is handed.
self.state.update(state)
return None
diff --git a/amber/src/main/python/core/models/payload.py
b/amber/src/main/python/core/models/payload.py
index 61a3329488..c2de13c712 100644
--- a/amber/src/main/python/core/models/payload.py
+++ b/amber/src/main/python/core/models/payload.py
@@ -34,3 +34,6 @@ class DataFrame(DataPayload):
@dataclass
class StateFrame(DataPayload):
frame: State
+ # Loop-control bookkeeping owned by the worker runtime, carried alongside
+ # the State payload (not inside it). Defaults to 0 for all non-loop state.
+ loop_counter: int = 0
diff --git a/amber/src/main/python/core/models/state_storage.py
b/amber/src/main/python/core/models/state_storage.py
new file mode 100644
index 0000000000..f880f03583
--- /dev/null
+++ b/amber/src/main/python/core/models/state_storage.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from .schema import Schema
+from .state import State
+from .tuple import Tuple
+
+
+class StateStorage:
+ """Two-column wire/storage format for a ``State`` and its ``loop_counter``.
+
+ ``loop_counter`` is loop-control bookkeeping owned by the worker runtime,
+ not part of the user ``State``. In memory it rides on the ``StateFrame``
+ envelope; whenever state is serialized (network) or materialized (the state
+ storage table) it is written as its own ``loop_counter`` column parallel to
+ ``content`` so it never enters the user state JSON.
+
+ This is the single source of truth for the two-column layout. The Scala
+ ``StateStorage`` object must stay byte-for-byte in sync (same column names,
+ order, and types), since the same state table is written/read by both.
+ """
+
+ CONTENT = "content"
+ LOOP_COUNTER = "loop_counter"
+ SCHEMA = Schema(raw_schema={CONTENT: "STRING", LOOP_COUNTER: "LONG"})
+
+ @staticmethod
+ def to_tuple(state: State, loop_counter: int) -> Tuple:
+ return Tuple(
+ {
+ StateStorage.CONTENT: state.to_json(),
+ StateStorage.LOOP_COUNTER: int(loop_counter),
+ },
+ schema=StateStorage.SCHEMA,
+ )
+
+ @staticmethod
+ def from_tuple(row: Tuple) -> "tuple[State, int]":
+ return (
+ State.from_json(row[StateStorage.CONTENT]),
+ int(row[StateStorage.LOOP_COUNTER]),
+ )
diff --git a/amber/src/main/python/core/runnables/main_loop.py
b/amber/src/main/python/core/runnables/main_loop.py
index 4e3a56d8a1..19fe12d6dc 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -29,6 +29,8 @@ from core.architecture.rpc.async_rpc_client import
AsyncRPCClient
from core.architecture.rpc.async_rpc_server import AsyncRPCServer
from core.models import (
InternalQueue,
+ StateFrame,
+ StateStorage,
Tuple,
)
from core.models.internal_marker import StartChannel, EndChannel
@@ -119,8 +121,10 @@ class MainLoop(StoppableQueueBlockingRunnable):
# user-visible loop state is written back to LoopStart's input.
for key in ("table", "output", "LoopStartId", "LoopStartStateURI"):
state.pop(key, None)
- writer = DocumentFactory.create_document(uri, State.SCHEMA).writer("0")
- writer.put_one(State(state).to_tuple())
+ writer = DocumentFactory.create_document(uri,
StateStorage.SCHEMA).writer("0")
+ # The back-edge fires only after the matching LoopEnd consumed at
+ # loop_counter == 0, so the next iteration's input starts at depth 0.
+ writer.put_one(StateStorage.to_tuple(State(state), 0))
writer.close()
def complete(self) -> None:
@@ -228,7 +232,7 @@ class MainLoop(StoppableQueueBlockingRunnable):
output_tuple
)
- def process_input_state(self) -> None:
+ def process_input_state(self, output_loop_counter: int = 0) -> None:
self._switch_context()
output_state = self.context.state_processing_manager.get_output_state()
if output_state is not None:
@@ -237,16 +241,19 @@ class MainLoop(StoppableQueueBlockingRunnable):
self.context.output_manager.reset_storage()
elif isinstance(executor, LoopStartOperator):
self._attach_loop_start_id(output_state)
- for to, batch in
self.context.output_manager.emit_state(output_state):
- self._output_queue.put(
- DataElement(
- tag=ChannelIdentity(
- ActorVirtualIdentity(self.context.worker_id), to,
False
- ),
- payload=batch,
- )
+ self._emit_and_save_state(output_state, output_loop_counter)
+
+ def _emit_and_save_state(self, state: State, loop_counter: int) -> None:
+ for to, batch in self.context.output_manager.emit_state(state,
loop_counter):
+ self._output_queue.put(
+ DataElement(
+ tag=ChannelIdentity(
+ ActorVirtualIdentity(self.context.worker_id), to, False
+ ),
+ payload=batch,
)
-
self.context.output_manager.save_state_to_storage_if_needed(output_state)
+ )
+ self.context.output_manager.save_state_to_storage_if_needed(state,
loop_counter)
def process_tuple_with_udf(self) -> Iterator[Optional[Tuple]]:
"""
@@ -289,9 +296,29 @@ class MainLoop(StoppableQueueBlockingRunnable):
self.process_input_tuple()
self._check_and_process_control()
- def _process_state(self, state_: State) -> None:
- self.context.state_processing_manager.current_input_state = state_
- self.process_input_state()
+ def _process_state_frame(self, frame: StateFrame) -> None:
+ # The runtime owns loop_counter; loop operators never see or mutate it.
+ # The LoopStart/LoopEnd nested pass-through branches are handled here
--
+ # forwarding the state and skipping the operator -- so the operator's
+ # process_state only ever runs the first-entry / consume path.
+ state = frame.frame
+ in_counter = frame.loop_counter
+ executor = self.context.executor_manager.executor
+
+ if isinstance(executor, LoopEndOperator) and in_counter > 0:
+ # State belongs to an outer loop: step one level out and forward.
+ self._emit_and_save_state(state, in_counter - 1)
+ self._check_and_process_control()
+ return
+ if isinstance(executor, LoopStartOperator) and "LoopStartStateURI" in
state:
+ # Outer loop's state flowing through an inner LoopStart: step one
+ # level deeper and forward.
+ self._emit_and_save_state(state, in_counter + 1)
+ self._check_and_process_control()
+ return
+
+ self.context.state_processing_manager.current_input_state = state
+ self.process_input_state(output_loop_counter=in_counter)
self._check_and_process_control()
def _process_start_channel(self) -> None:
@@ -460,8 +487,8 @@ class MainLoop(StoppableQueueBlockingRunnable):
element,
Tuple,
self._process_tuple,
- State,
- self._process_state,
+ StateFrame,
+ self._process_state_frame,
)
except Exception as err:
logger.exception(err)
diff --git a/amber/src/main/python/core/runnables/network_receiver.py
b/amber/src/main/python/core/runnables/network_receiver.py
index 8ba4fbe147..2ddf493926 100644
--- a/amber/src/main/python/core/runnables/network_receiver.py
+++ b/amber/src/main/python/core/runnables/network_receiver.py
@@ -34,6 +34,7 @@ from core.models import (
DataFrame,
State,
StateFrame,
+ StateStorage,
)
from core.models.internal_queue import (
DataElement,
@@ -96,7 +97,10 @@ class NetworkReceiver(Runnable, Stoppable):
"Data",
lambda _: DataFrame(table),
"State",
- lambda _:
StateFrame(State.from_json(table[State.CONTENT][0].as_py())),
+ lambda _: StateFrame(
+ State.from_json(table[StateStorage.CONTENT][0].as_py()),
+
loop_counter=int(table[StateStorage.LOOP_COUNTER][0].as_py()),
+ ),
"ECM",
lambda _:
EmbeddedControlMessage().parse(table["payload"][0].as_py()),
)
diff --git a/amber/src/main/python/core/runnables/network_sender.py
b/amber/src/main/python/core/runnables/network_sender.py
index d8e3889ac1..16a4e5c7a5 100644
--- a/amber/src/main/python/core/runnables/network_sender.py
+++ b/amber/src/main/python/core/runnables/network_sender.py
@@ -20,7 +20,13 @@ from loguru import logger
from overrides import overrides
from typing import Optional
-from core.models import DataPayload, InternalQueue, DataFrame, State,
StateFrame
+from core.models import (
+ DataPayload,
+ InternalQueue,
+ DataFrame,
+ StateFrame,
+ StateStorage,
+)
from core.models.internal_queue import (
InternalQueueElement,
DataElement,
@@ -100,8 +106,11 @@ class NetworkSender(StoppableQueueBlockingRunnable):
elif isinstance(data_payload, StateFrame):
data_header = PythonDataHeader(tag=to, payload_type="State")
table = pa.Table.from_pydict(
- {State.CONTENT: [data_payload.frame.to_json()]},
- schema=State.SCHEMA.as_arrow_schema(),
+ {
+ StateStorage.CONTENT: [data_payload.frame.to_json()],
+ StateStorage.LOOP_COUNTER:
[int(data_payload.loop_counter)],
+ },
+ schema=StateStorage.SCHEMA.as_arrow_schema(),
)
self._proxy_client.send_data(bytes(data_header), table)
else:
diff --git
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
index 3e0e2d48ab..04390e6d2d 100644
---
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
+++
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
@@ -34,7 +34,14 @@ from
core.architecture.sendsemantics.range_based_shuffle_partitioner import (
from core.architecture.sendsemantics.round_robin_partitioner import (
RoundRobinPartitioner,
)
-from core.models import Tuple, InternalQueue, DataFrame, DataPayload, State,
StateFrame
+from core.models import (
+ Tuple,
+ InternalQueue,
+ DataFrame,
+ DataPayload,
+ StateFrame,
+ StateStorage,
+)
from core.models.internal_queue import DataElement, ECMElement
from core.storage.document_factory import DocumentFactory
from core.storage.vfs_uri_factory import VFSURIFactory
@@ -152,7 +159,8 @@ class InputPortMaterializationReaderRunnable(Runnable,
Stoppable):
VFSURIFactory.state_uri(self.uri)
)
for state_row in state_document.get():
- self.emit_payload(StateFrame(State.from_tuple(state_row)))
+ state, loop_counter = StateStorage.from_tuple(state_row)
+ self.emit_payload(StateFrame(state, loop_counter=loop_counter))
storage_iterator = self.materialization.get()
# Iterate and process tuples.
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
index 030fa3a3bb..12d48987f8 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
@@ -19,7 +19,7 @@
package org.apache.texera.amber.engine.architecture.messaginglayer
-import org.apache.texera.amber.core.state.State
+import org.apache.texera.amber.core.state.{State, StateStorage}
import org.apache.texera.amber.core.storage.{DocumentFactory, VFSURIFactory}
import org.apache.texera.amber.core.storage.model.BufferedItemWriter
import org.apache.texera.amber.core.tuple._
@@ -242,7 +242,7 @@ class OutputManager(
// emit side: state is shared context, not per-key data, so every
// downstream operator (and every worker reading the materialization)
// needs the full set.
- stateWriterThreads.values.foreach(_.queue.put(Left(state.toTuple)))
+
stateWriterThreads.values.foreach(_.queue.put(Left(StateStorage.toTuple(state,
0L))))
}
/**
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala
index 6618e857b1..3f56ddfd30 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala
@@ -36,6 +36,7 @@ import
org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.ReturnInvocation
import org.apache.texera.amber.engine.common.AmberLogging
import org.apache.texera.amber.engine.common.actormessage.{ActorCommand,
PythonActorMessage}
+import org.apache.texera.amber.core.state.StateStorage
import org.apache.texera.amber.engine.common.ambermessage._
import org.apache.texera.amber.util.ArrowUtils
import org.apache.arrow.flight._
@@ -125,7 +126,7 @@ class PythonProxyClient(portNumberPromise: Promise[Int],
val actorId: ActorVirtu
case DataFrame(frame) =>
writeArrowStream(mutable.Queue(ArraySeq.unsafeWrapArray(frame): _*),
from, "Data")
case StateFrame(state) =>
- writeArrowStream(mutable.Queue(state.toTuple), from, "State")
+ writeArrowStream(mutable.Queue(StateStorage.toTuple(state, 0L)), from,
"State")
}
}
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 6d083fa6d6..9ccc0f62e7 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -21,7 +21,7 @@ package org.apache.texera.amber.engine.architecture.scheduling
import org.apache.pekko.pattern.gracefulStop
import com.twitter.util.{Duration => TwitterDuration, Future, JavaTimer,
Return, Throw, Timer}
-import org.apache.texera.amber.core.state.State
+import org.apache.texera.amber.core.state.StateStorage
import org.apache.texera.amber.core.storage.{DocumentFactory, VFSURIFactory}
import org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI
import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
@@ -585,7 +585,7 @@ class RegionExecutionCoordinator(
DocumentFactory.createDocument(resultURI, schema)
}
if (!isLoopEndRegion || !DocumentFactory.documentExists(stateURI)) {
- DocumentFactory.createDocument(stateURI, State.schema)
+ DocumentFactory.createDocument(stateURI, StateStorage.schema)
}
if (!isRestart) {
val (_, eid, _, _) = decodeURI(resultURI)
diff --git
a/amber/src/test/python/core/architecture/packaging/test_output_manager.py
b/amber/src/test/python/core/architecture/packaging/test_output_manager.py
index dcf7ccde67..95e03dca63 100644
--- a/amber/src/test/python/core/architecture/packaging/test_output_manager.py
+++ b/amber/src/test/python/core/architecture/packaging/test_output_manager.py
@@ -49,15 +49,15 @@ class TestSaveStateToStorageIfNeeded:
@pytest.fixture
def state(self):
- return State({"loop_counter": 1, "i": 2})
+ return State({"i": 2})
def test_no_state_writers_is_a_noop(self, output_manager, state):
# With no port set up, save_state_to_storage_if_needed must not
# touch any writer.
- output_manager.save_state_to_storage_if_needed(state) # no-op
+ output_manager.save_state_to_storage_if_needed(state, 0) # no-op
def test_unknown_port_id_is_a_noop(self, output_manager, state, port_a):
- output_manager.save_state_to_storage_if_needed(state, port_id=port_a)
+ output_manager.save_state_to_storage_if_needed(state, 0,
port_id=port_a)
# No assertion needed -- the absence of any writer means nothing
# was attempted.
@@ -67,7 +67,7 @@ class TestSaveStateToStorageIfNeeded:
queue_a, _, _ = _stub_state_writer(output_manager, port_a)
queue_b, _, _ = _stub_state_writer(output_manager, port_b)
- output_manager.save_state_to_storage_if_needed(state)
+ output_manager.save_state_to_storage_if_needed(state, 0)
# Each port's writer queue receives one PortStorageWriterElement.
# Critically, save is non-blocking -- the call must not invoke
@@ -84,7 +84,7 @@ class TestSaveStateToStorageIfNeeded:
queue_a, _, _ = _stub_state_writer(output_manager, port_a)
queue_b, _, _ = _stub_state_writer(output_manager, port_b)
- output_manager.save_state_to_storage_if_needed(state, port_id=port_a)
+ output_manager.save_state_to_storage_if_needed(state, 0,
port_id=port_a)
assert queue_a.put.call_count == 1
queue_b.put.assert_not_called()
diff --git
a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
index 9d1fd30698..eb2a96b1e7 100644
---
a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
+++
b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
@@ -43,7 +43,7 @@ import pytest
from pyiceberg.catalog.sql import SqlCatalog
from core.architecture.packaging.output_manager import OutputManager
-from core.models import State, StateFrame
+from core.models import State, StateFrame, StateStorage
from core.models.internal_queue import DataElement, InternalQueue
from core.storage.document_factory import DocumentFactory
from core.storage.iceberg.iceberg_catalog_instance import
IcebergCatalogInstance
@@ -161,7 +161,9 @@ def
test_state_written_by_output_manager_is_replayed_by_reader():
DocumentFactory.create_document(
VFSURIFactory.result_uri(base_uri), worker_schema_for_result
)
- DocumentFactory.create_document(VFSURIFactory.state_uri(base_uri),
State.SCHEMA)
+ DocumentFactory.create_document(
+ VFSURIFactory.state_uri(base_uri), StateStorage.SCHEMA
+ )
# 2. Producer side: spin up an OutputManager, set up real state +
# result writer threads against the iceberg storage.
@@ -170,9 +172,10 @@ def
test_state_written_by_output_manager_is_replayed_by_reader():
port_id, schema=worker_schema_for_result, storage_uri_base=base_uri
)
- # 3. Drive a state through the producer-side path.
- state = State({"flag": True, "loop_counter": 7, "name": "outer"})
- producer.save_state_to_storage_if_needed(state)
+ # 3. Drive a state through the producer-side path. loop_counter rides
+ # alongside the State (not inside it) and is materialized as its own
column.
+ state = State({"flag": True, "name": "outer"})
+ producer.save_state_to_storage_if_needed(state, 7)
# 4. Force the writer threads to flush + commit by closing them.
# Without this, the iceberg buffer holds the state in memory and
@@ -213,19 +216,23 @@ def
test_state_written_by_output_manager_is_replayed_by_reader():
assert reader.finished(), "reader exited but did not mark itself finished"
# 6. Drain the consumer's queue and find the StateFrame(s).
- state_frames: list[State] = []
+ state_frames: list[StateFrame] = []
while not consumer_queue.is_empty():
elem = consumer_queue.get()
if isinstance(elem, DataElement) and isinstance(elem.payload,
StateFrame):
- state_frames.append(elem.payload.frame)
+ state_frames.append(elem.payload)
assert len(state_frames) == 1, (
f"expected exactly one State to flow through writer→iceberg→reader; "
f"got {len(state_frames)}: {state_frames}"
)
- assert state_frames[0] == state, (
+ assert state_frames[0].frame == state, (
f"replayed state did not match what was written; "
- f"wrote={state}, read={state_frames[0]}"
+ f"wrote={state}, read={state_frames[0].frame}"
+ )
+ assert state_frames[0].loop_counter == 7, (
+ f"loop_counter must round-trip through its own column; "
+ f"got {state_frames[0].loop_counter}"
)
@@ -239,13 +246,15 @@ def test_state_table_persists_across_writer_close():
port_id = PortIdentity(id=0, internal=False)
DocumentFactory.create_document(VFSURIFactory.result_uri(base_uri),
State.SCHEMA)
- DocumentFactory.create_document(VFSURIFactory.state_uri(base_uri),
State.SCHEMA)
+ DocumentFactory.create_document(
+ VFSURIFactory.state_uri(base_uri), StateStorage.SCHEMA
+ )
producer = OutputManager(worker_id="Worker:WF0-test-producer2-main-0")
producer.add_output_port(port_id, schema=State.SCHEMA,
storage_uri_base=base_uri)
state = State({"flag": False, "checkpoint": 42})
- producer.save_state_to_storage_if_needed(state)
+ producer.save_state_to_storage_if_needed(state, 0)
producer.close_port_storage_writers()
# Read directly from the iceberg state document, bypassing the reader.
@@ -255,4 +264,4 @@ def test_state_table_persists_across_writer_close():
f"expected exactly one row in the iceberg state table after the "
f"writer was closed; got {len(rows)} rows"
)
- assert State.from_tuple(rows[0]) == state
+ assert StateStorage.from_tuple(rows[0]) == (state, 0)
diff --git a/amber/src/test/python/core/models/test_loop_operators.py
b/amber/src/test/python/core/models/test_loop_operators.py
index a7d67b421b..edbc029ad7 100644
--- a/amber/src/test/python/core/models/test_loop_operators.py
+++ b/amber/src/test/python/core/models/test_loop_operators.py
@@ -71,7 +71,7 @@ class _StubLoopStart(LoopStartOperator):
self._output_expr = output_expr
def open(self) -> None:
- self.state = {"loop_counter": 0}
+ self.state = {}
exec(self._initialization, {}, self.state)
def process_table(self, table: Table, port: int) ->
Iterator[Optional[TableLike]]:
@@ -97,10 +97,9 @@ class _StubLoopEnd(LoopEndOperator):
self.state = {}
def process_state(self, state: State, port: int) -> Optional[State]:
- loop_counter = int(state.get("loop_counter", 0))
- if loop_counter > 0:
- state["loop_counter"] = loop_counter - 1
- return state
+ # Consume-only, mirroring the simplified codegen: the runtime owns
+ # loop_counter and the nested pass-through branch, so the operator only
+ # ever runs the matching-loop (consume) path.
self.state = dict(state)
self.state["table"] = loads(self.state["table"])
exec(self._update, {}, self.state)
@@ -129,37 +128,10 @@ class TestLoopStartProcessState:
assert result is None, "first-time state must not be forwarded"
assert op.state["upstream_key"] == "v", "state was not merged into
self.state"
- # loop_counter is left at its open() value (0) on first entry.
- assert op.state["loop_counter"] == 0
-
- def test_reentry_state_with_loop_start_uri_increments_loop_counter(self):
- # Re-entry from this LoopStart's own LoopEnd: the state carries
- # LoopStartStateURI, so the base class must INCREMENT
- # loop_counter and PASS THROUGH the state downstream. This is
- # what main_loop's _attach_loop_start_id relies on.
- op = _StubLoopStart()
- op.open()
- incoming = State({"LoopStartStateURI": "vfs:///x", "loop_counter": 0,
"i": 2})
-
- result = op.process_state(incoming, port=0)
-
- assert result is not None, "re-entry state must be returned for
downstream"
- assert result["loop_counter"] == 1
- # The user variable rides along.
- assert result["i"] == 2
-
- def test_reentry_at_nested_loop_counter_bumps_one(self):
- # Nested loop: an outer loop's re-entry state passes through this
- # inner LoopStart with a loop_counter already > 0 (because the
- # outer LoopStart bumped it on its own re-entry first). The
- # invariant is that we only ever +1, never reset.
- op = _StubLoopStart()
- op.open()
- incoming = State({"LoopStartStateURI": "vfs:///outer", "loop_counter":
5})
-
- result = op.process_state(incoming, port=0)
- assert result["loop_counter"] == 6
+ # NOTE: LoopStart re-entry (+1) is owned by the worker runtime now, not the
+ # operator (which only does the first-entry merge above). It and the nested
+ # pass-through are covered in test_main_loop.py::TestLoopCounterRuntime.
# ---------------------------------------------------------------------------
@@ -202,7 +174,9 @@ class TestLoopStartProduceStateOnFinish:
assert produced["i"] == 0
assert produced["acc"] == []
- assert produced["loop_counter"] == 0
+ # loop_counter is no longer seeded into the operator's state; it is
+ # runtime-owned and rides on the StateFrame envelope.
+ assert "loop_counter" not in produced
# ---------------------------------------------------------------------------
@@ -303,126 +277,8 @@ class TestLoopEndMatchingBranch:
# ---------------------------------------------------------------------------
-# Nested loops — LoopEnd pass-through branch
-# ---------------------------------------------------------------------------
-
-
-class TestLoopEndNestedPassThrough:
- def test_loop_counter_positive_decrements_and_passes_state_through(self):
- # When the inner LoopEnd receives state with loop_counter > 0,
- # the state belongs to an OUTER loop. The inner LoopEnd must
- # decrement loop_counter and return the state for downstream
- # routing (which eventually reaches the outer LoopEnd at
- # loop_counter == 0).
- op = _StubLoopEnd(update="i += 1")
- op.state = {"sentinel": "must_not_be_overwritten"}
-
- incoming = State({"loop_counter": 2, "outer_var": "v"})
- result = op.process_state(incoming, port=0)
-
- assert result is not None, "pass-through branch must emit state
downstream"
- assert result["loop_counter"] == 1
- assert result["outer_var"] == "v"
- # The pass-through branch must NOT overwrite self.state — the
- # inner LoopEnd's own matching-loop state from a previous inner
- # iteration must be preserved.
- assert op.state == {"sentinel": "must_not_be_overwritten"}
-
- def test_pass_through_chain_collapses_to_matching_branch_at_zero(self):
- # Walk a state with loop_counter=3 through three levels of
- # nested LoopEnds: each strips one level, and the fourth
- # (loop_counter == 0) is the matching loop that runs the
- # user's update. This pins the depth-symmetric invariant
- # nested-for-loop scheduling depends on.
- from pickle import dumps
-
- outer = _StubLoopEnd(update="i += 10")
- middle = _StubLoopEnd(update="i += 100")
- inner = _StubLoopEnd(update="i += 1000")
- match = _StubLoopEnd(update="i += 1")
-
- state = State(
- {
- "loop_counter": 3,
- "i": 0,
- "table": dumps(Table([Tuple({"v": 1})])),
- }
- )
-
- # Each outer→inner hop decrements once.
- state = outer.process_state(state, port=0)
- assert state["loop_counter"] == 2
- state = middle.process_state(state, port=0)
- assert state["loop_counter"] == 1
- state = inner.process_state(state, port=0)
- assert state["loop_counter"] == 0
- # At loop_counter == 0 the matching LoopEnd consumes the state
- # and runs ITS user update — not any of the pass-through ops'.
- result = match.process_state(state, port=0)
- assert result is None
- assert match.state["i"] == 1, "only the matching LoopEnd's update
should fire"
-
-
-# ---------------------------------------------------------------------------
-# Nested loops — round trip
+# Nested-loop counter behaviour -- LoopStart +1, LoopEnd -1, and the
+# depth-symmetric invariant -- is now owned by the worker runtime (the
+# operators no longer read or mutate loop_counter), so it is covered in
+# test_main_loop.py::TestLoopCounterRuntime rather than here.
# ---------------------------------------------------------------------------
-
-
-class TestNestedLoopRoundTrip:
- def test_outer_then_inner_loop_state_keeps_counters_symmetric(self):
- # Simulate the state flow for one outer iteration that itself
- # triggers one inner iteration:
- #
- # outer LoopStart re-entry → loop_counter 0 → 1
- # inner LoopStart re-entry → loop_counter 1 → 2
- # inner LoopEnd → loop_counter 2 → 1
- # outer LoopEnd → loop_counter 1 → 0 (matching branch)
- #
- # The matching branch on the outer LoopEnd is reached iff every
- # increment is mirrored by exactly one decrement. A bug in
- # either side would land us in the wrong branch.
- outer_start = _StubLoopStart()
- inner_start = _StubLoopStart()
- inner_end = _StubLoopEnd(update="outer_i += 100")
- outer_end = _StubLoopEnd(update="outer_i += 1")
- outer_start.open()
- inner_start.open()
-
- from pickle import dumps
-
- # outer LoopEnd jumped back to outer LoopStart with this state.
- state_in = State(
- {
- "LoopStartStateURI": "vfs:///outer",
- "loop_counter": 0,
- "outer_i": 0,
- "table": dumps(Table([Tuple({"v": 1})])),
- }
- )
-
- # outer LoopStart re-entry: +1
- state_after_outer_start = outer_start.process_state(state_in, port=0)
- assert state_after_outer_start["loop_counter"] == 1
- # inner LoopStart sees the same passing state and +1 again.
- state_after_inner_start = inner_start.process_state(
- state_after_outer_start, port=0
- )
- assert state_after_inner_start["loop_counter"] == 2
- # inner LoopEnd: pass-through branch (-1).
- state_after_inner_end =
inner_end.process_state(state_after_inner_start, port=0)
- assert state_after_inner_end is not None
- assert state_after_inner_end["loop_counter"] == 1
- # outer LoopEnd: pass-through (-1) lands at 0, the matching branch.
- # Now process_state would have to run the matching branch path
- # because loop_counter == 0. To get there we need one more hop:
- result = outer_end.process_state(state_after_inner_end, port=0)
- # NOT yet at 0 — pass-through decrements to 0 and returns. The
- # NEXT hop is the matching branch.
- assert result is not None
- assert result["loop_counter"] == 0
-
- # Final landing on the matching branch consumes the state and
- # runs the outer update (outer_i += 1).
- matching = _StubLoopEnd(update="outer_i += 1")
- assert matching.process_state(result, port=0) is None
- assert matching.state["outer_i"] == 1
diff --git a/amber/src/test/python/core/runnables/test_main_loop.py
b/amber/src/test/python/core/runnables/test_main_loop.py
index c32f45b888..c7980fc3a6 100644
--- a/amber/src/test/python/core/runnables/test_main_loop.py
+++ b/amber/src/test/python/core/runnables/test_main_loop.py
@@ -36,6 +36,7 @@ from core.models.internal_queue import (
DCMElement,
ECMElement,
)
+from core.models.operator import LoopEndOperator, LoopStartOperator
from core.runnables import MainLoop
from core.util import set_one_of
from proto.org.apache.texera.amber.core import (
@@ -1150,7 +1151,9 @@ class TestMainLoop:
monkeypatch.setattr(
main_loop.context.output_manager,
"emit_state",
- lambda state: [(mock_data_output_channel.to_worker_id,
StateFrame(state))],
+ lambda state, loop_counter: [
+ (mock_data_output_channel.to_worker_id, StateFrame(state))
+ ],
)
def fake_switch_context():
@@ -1167,8 +1170,8 @@ class TestMainLoop:
first_state = State({"value": 1})
second_state = State({"value": 41})
- main_loop._process_state(first_state)
- main_loop._process_state(second_state)
+ main_loop._process_state_frame(StateFrame(first_state))
+ main_loop._process_state_frame(StateFrame(second_state))
first_output: DataElement = output_queue.get()
second_output: DataElement = output_queue.get()
@@ -1360,7 +1363,9 @@ class TestMainLoop:
monkeypatch.setattr(
main_loop.context.output_manager,
"emit_state",
- lambda state: [(mock_data_output_channel.to_worker_id,
StateFrame(state))],
+ lambda state, loop_counter: [
+ (mock_data_output_channel.to_worker_id, StateFrame(state))
+ ],
)
def fake_switch_context():
@@ -1377,8 +1382,8 @@ class TestMainLoop:
first_state = State({"value": 1})
second_state = State({"value": 41})
- main_loop._process_state(first_state)
- main_loop._process_state(second_state)
+ main_loop._process_state_frame(StateFrame(first_state))
+ main_loop._process_state_frame(StateFrame(second_state))
first_output: DataElement = output_queue.get()
second_output: DataElement = output_queue.get()
@@ -1414,12 +1419,14 @@ class TestMainLoop:
monkeypatch.setattr(
main_loop.context.output_manager,
"emit_state",
- lambda state: [(mock_data_output_channel.to_worker_id,
StateFrame(state))],
+ lambda state, loop_counter: [
+ (mock_data_output_channel.to_worker_id, StateFrame(state))
+ ],
)
monkeypatch.setattr(
main_loop.context.output_manager,
"save_state_to_storage_if_needed",
- lambda state: saved_states.append(state),
+ lambda state, loop_counter: saved_states.append(state),
)
def fake_switch_context():
@@ -1433,8 +1440,8 @@ class TestMainLoop:
monkeypatch.setattr(main_loop, "_switch_context", fake_switch_context)
- main_loop._process_state(State({"value": 1}))
- main_loop._process_state(State({"value": 41}))
+ main_loop._process_state_frame(StateFrame(State({"value": 1})))
+ main_loop._process_state_frame(StateFrame(State({"value": 41})))
# Each input state produced one output state, so both must have
# been persisted in order.
@@ -1458,7 +1465,7 @@ class TestMainLoop:
# → `process_input_state` → DataProcessor.process_internal_marker
# (StartChannel) → executor.produce_state_on_start → _set_output_state
# → MainLoop reads output state → emit + save.
- on_start_state = State({"flag": True, "loop_counter": 0})
+ on_start_state = State({"flag": True})
class DummyExecutor:
@staticmethod
@@ -1473,12 +1480,14 @@ class TestMainLoop:
monkeypatch.setattr(
main_loop.context.output_manager,
"emit_state",
- lambda state: [(mock_data_output_channel.to_worker_id,
StateFrame(state))],
+ lambda state, loop_counter: [
+ (mock_data_output_channel.to_worker_id, StateFrame(state))
+ ],
)
monkeypatch.setattr(
main_loop.context.output_manager,
"save_state_to_storage_if_needed",
- lambda state: saved_states.append(state),
+ lambda state, loop_counter: saved_states.append(state),
)
# _send_ecm_to_data_channels touches output_manager state we don't
# set up here; for this test the ECM forwarding is irrelevant -- the
@@ -1519,7 +1528,9 @@ class TestMainLoop:
f"to storage. saved_states={saved_states}"
)
assert saved_states[0]["flag"] is True
- assert saved_states[0]["loop_counter"] == 0
+ # loop_counter is no longer part of the user State; it rides on the
+ # StateFrame envelope / its own materialized column.
+ assert "loop_counter" not in saved_states[0]
assert saved_states[0]["port"] == 0
@pytest.mark.timeout(2)
@@ -1536,17 +1547,17 @@ class TestMainLoop:
monkeypatch.setattr(
main_loop.context.output_manager,
"emit_state",
- lambda state: [],
+ lambda state, loop_counter: [],
)
monkeypatch.setattr(
main_loop.context.output_manager,
"save_state_to_storage_if_needed",
- lambda state: save_calls.append(state),
+ lambda state, loop_counter: save_calls.append(state),
)
# Pretend DataProc consumed the input but produced no output.
monkeypatch.setattr(main_loop, "_switch_context", lambda: None)
- main_loop._process_state(State({"value": 1}))
+ main_loop._process_state_frame(StateFrame(State({"value": 1})))
assert save_calls == []
@@ -1777,3 +1788,91 @@ class TestMainLoop:
assert events[0][1].msg_type == ConsoleMessageType.ERROR
assert "boom-from-executor" in events[0][1].title
assert events[1][1] is PauseType.EXCEPTION_PAUSE
+
+ # -- Loop counter is runtime-owned (relocated from test_loop_operators) ---
+ #
+ # loop_counter is not part of State; it rides on the StateFrame envelope
and
+ # the runtime (_process_state_frame) owns the +1/-1. On the nested
+ # pass-through branches the operator must be skipped entirely.
+
+ def _capture_state_emit(self, main_loop, monkeypatch):
+ """Stub emit/save/switch; return (emitted, switched) recorders."""
+ emitted = []
+ switched = []
+ monkeypatch.setattr(main_loop, "_check_and_process_control", lambda:
None)
+ monkeypatch.setattr(main_loop, "_switch_context", lambda:
switched.append(True))
+ monkeypatch.setattr(
+ main_loop.context.output_manager,
+ "emit_state",
+ lambda state, loop_counter: emitted.append((state, loop_counter))
or [],
+ )
+ monkeypatch.setattr(
+ main_loop.context.output_manager,
+ "save_state_to_storage_if_needed",
+ lambda state, loop_counter: None,
+ )
+ return emitted, switched
+
+ def test_loopstart_reentry_increments_counter_and_skips_operator(
+ self, main_loop, monkeypatch
+ ):
+ # A state already carrying LoopStartStateURI is an outer loop's state
+ # passing through this inner LoopStart. The runtime forwards it with
+ # loop_counter + 1 and must NOT invoke the operator.
+ class StubLoopStart(LoopStartOperator):
+ def process_table(self, table, port):
+ yield
+
+ main_loop.context.executor_manager.executor = StubLoopStart()
+ emitted, switched = self._capture_state_emit(main_loop, monkeypatch)
+
+ main_loop._process_state_frame(
+ StateFrame(State({"LoopStartStateURI": "vfs:///x", "i": 5}),
loop_counter=1)
+ )
+
+ assert switched == [], "nested pass-through must not invoke the
operator"
+ assert len(emitted) == 1
+ emitted_state, emitted_counter = emitted[0]
+ assert emitted_counter == 2 # 1 + 1
+ assert emitted_state["i"] == 5
+ assert "loop_counter" not in emitted_state # never leaks into State
+
+ def test_loopend_passthrough_decrements_counter_and_skips_operator(
+ self, main_loop, monkeypatch
+ ):
+ # loop_counter > 0 at a LoopEnd means the state belongs to an outer
+ # loop: the runtime decrements and forwards, skipping the operator.
+ class StubLoopEnd(LoopEndOperator):
+ def condition(self):
+ return False
+
+ main_loop.context.executor_manager.executor = StubLoopEnd()
+ emitted, switched = self._capture_state_emit(main_loop, monkeypatch)
+
+ main_loop._process_state_frame(
+ StateFrame(State({"outer_var": "v"}), loop_counter=2)
+ )
+
+ assert switched == [], "pass-through must not invoke the operator"
+ assert len(emitted) == 1
+ emitted_state, emitted_counter = emitted[0]
+ assert emitted_counter == 1 # 2 - 1
+ assert emitted_state["outer_var"] == "v"
+
+ def test_loopend_consume_invokes_operator_at_counter_zero(
+ self, main_loop, monkeypatch
+ ):
+ # loop_counter == 0 is the matching loop: the runtime runs the operator
+ # (consume) via the context switch. The operator returns None, so no
+ # state is emitted; the loop-back is driven by complete() separately.
+ class StubLoopEnd(LoopEndOperator):
+ def condition(self):
+ return False
+
+ main_loop.context.executor_manager.executor = StubLoopEnd()
+ emitted, switched = self._capture_state_emit(main_loop, monkeypatch)
+
+ main_loop._process_state_frame(StateFrame(State({"i": 0}),
loop_counter=0))
+
+ assert switched == [True], "consume branch must invoke the operator"
+ assert emitted == [], "operator returned None -> nothing emitted"
diff --git
a/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py
b/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py
index 5016c2df2f..f8d6e96417 100644
---
a/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py
+++
b/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py
@@ -19,7 +19,7 @@ from unittest.mock import MagicMock, patch
import pytest
-from core.models import State, StateFrame
+from core.models import State, StateFrame, StateStorage
from core.models.internal_queue import DataElement
from core.models.schema import Schema
from core.storage.runnables.input_port_materialization_reader_runnable import (
@@ -60,12 +60,13 @@ class TestRunStateReadingBlock:
return instance
def test_state_rows_are_emitted_as_state_frames(self, runnable):
- state_a = State({"loop_counter": 0})
- state_b = State({"loop_counter": 1})
+ state_a = State({"i": 0})
+ state_b = State({"i": 1})
- # The state document yields opaque tuples; from_tuple deserializes
- # them. Patch from_tuple so we don't have to wire a real
- # serialization.
+ # The state document yields opaque 2-column tuples; StateStorage
+ # .from_tuple deserializes each into (State, loop_counter). Patch it
+ # so we don't have to wire a real serialization. The loop_counter
+ # must be carried onto the emitted StateFrame envelope.
result_doc = MagicMock()
result_doc.get.return_value = iter([]) # No materialized tuples.
state_doc = MagicMock()
@@ -75,13 +76,13 @@ class TestRunStateReadingBlock:
patch(
"core.storage.runnables.input_port_materialization_reader_runnable.DocumentFactory"
) as mock_factory,
- patch.object(State, "from_tuple") as mock_from_tuple,
+ patch.object(StateStorage, "from_tuple") as mock_from_tuple,
):
mock_factory.open_document.side_effect = [
(result_doc, runnable.tuple_schema),
(state_doc, None),
]
- mock_from_tuple.side_effect = [state_a, state_b]
+ mock_from_tuple.side_effect = [(state_a, 0), (state_b, 1)]
runnable.run()
@@ -96,4 +97,5 @@ class TestRunStateReadingBlock:
and isinstance(call.args[0].payload, StateFrame)
]
assert [sf.payload.frame for sf in state_frames] == [state_a, state_b]
+ assert [sf.payload.loop_counter for sf in state_frames] == [0, 1]
assert runnable._finished is True
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/StateStorage.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/StateStorage.scala
new file mode 100644
index 0000000000..541359a28c
--- /dev/null
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/StateStorage.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.core.state
+
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema,
Tuple}
+
+/**
+ * Two-column wire/storage format for a [[State]] plus its loop-control
+ * `loop_counter`.
+ *
+ * `loop_counter` is loop bookkeeping owned by the (Python) worker runtime and
+ * is intentionally NOT part of [[State]]. Whenever state is materialized (the
+ * state storage table) or sent over the wire it is written as its own
+ * `loop_counter` column parallel to `content`, so it never enters the user
+ * state JSON. Scala operators never produce a non-zero counter, so the Scala
+ * write paths emit `0`; this object exists so the bilingual state table
schema
+ * and tuple layout stay byte-for-byte in sync with the Python `StateStorage`.
+ */
+object StateStorage {
+ val Content = "content"
+ val LoopCounter = "loop_counter"
+
+ val schema: Schema = new Schema(
+ new Attribute(Content, AttributeType.STRING),
+ new Attribute(LoopCounter, AttributeType.LONG)
+ )
+
+ def toTuple(state: State, loopCounter: Long): Tuple =
+ Tuple
+ .builder(schema)
+ .addSequentially(Array(state.toJson, Long.box(loopCounter)))
+ .build()
+
+ def fromTuple(row: Tuple): (State, Long) =
+ (
+ State.fromJson(row.getField[String](Content)),
+ row.getField[java.lang.Long](LoopCounter).toLong
+ )
+}
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateStorageSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateStorageSpec.scala
new file mode 100644
index 0000000000..2546646fc6
--- /dev/null
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateStorageSpec.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.core.state
+
+import org.scalatest.flatspec.AnyFlatSpec
+
+class StateStorageSpec extends AnyFlatSpec {
+
+ "StateStorage" should "round-trip a state and its loop_counter through a
tuple" in {
+ val state = State(Map("i" -> 2L, "name" -> "outer"))
+ val tuple = StateStorage.toTuple(state, 3L)
+ val (decodedState, decodedCounter) = StateStorage.fromTuple(tuple)
+ assert(decodedState == state)
+ assert(decodedCounter == 3L)
+ }
+
+ it should "materialize loop_counter as its own column, never inside content"
in {
+ // The whole point of the off-State design: loop_counter lives in a
+ // sibling column, so the user state JSON in `content` stays clean.
+ val state = State(Map("i" -> 7L))
+ val tuple = StateStorage.toTuple(state, 5L)
+ assert(tuple.getField[String]("content") == state.toJson)
+ assert(!tuple.getField[String]("content").contains("loop_counter"))
+ assert(tuple.getField[java.lang.Long]("loop_counter").toLong == 5L)
+ }
+
+ it should "default an absent counter round-trip to the written value" in {
+ val tuple = StateStorage.toTuple(State(Map.empty), 0L)
+ assert(tuple.getSchema == StateStorage.schema)
+ val (decodedState, decodedCounter) = StateStorage.fromTuple(tuple)
+ assert(decodedState == State(Map.empty))
+ assert(decodedCounter == 0L)
+ }
+}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala
index c1a75b56d5..539a01c16d 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala
@@ -76,10 +76,6 @@ class LoopEndOpDesc extends LogicalOp {
|class ProcessLoopEndOperator(LoopEndOperator):
| @overrides
| def process_state(self, state: State, port: int) ->
Optional[State]:
- | loop_counter = int(state.get("loop_counter", 0))
- | if loop_counter > 0:
- | state["loop_counter"] = loop_counter - 1
- | return state
| self.state = dict(state)
| from pickle import loads
| self.state["table"] = loads(self.state["table"])
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala
index c6194632fe..8fb312f8fe 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala
@@ -75,7 +75,7 @@ class LoopStartOpDesc extends LogicalOp {
|class ProcessLoopStartOperator(LoopStartOperator):
| @overrides
| def open(self):
- | self.state = {"loop_counter": 0}
+ | self.state = {}
| exec($initialization, {}, self.state)
|
| @overrides
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/loop/LoopEndOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/loop/LoopEndOpDescSpec.scala
index 0544385973..6e985dc1fa 100644
---
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/loop/LoopEndOpDescSpec.scala
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/loop/LoopEndOpDescSpec.scala
@@ -75,16 +75,16 @@ class LoopEndOpDescSpec extends AnyFlatSpec with
LoopOpDescSpecMixin {
code should include("def condition(self) -> bool:")
}
- it should "decrement loop_counter and pass state through when loop_counter >
0 (nested-loop case)" in {
- // For nested loops the inner LoopEnd sees state belonging to an
- // outer loop. The generated process_state recognises this by a
- // positive loop_counter and just decrements + returns the state,
- // leaving the actual loop-control work to the outer LoopEnd.
- // Critical for nested-for-loop correctness -- pin its shape.
+ it should "generate a consume-only process_state with no loop_counter
handling" in {
+ // loop_counter is owned by the worker runtime now, not the operator. The
+ // nested-loop pass-through (decrement + forward) happens in
+ // main_loop._process_state_frame before the operator is invoked, so the
+ // generated LoopEnd only ever runs the matching-loop (consume) path and
+ // must not read or mutate loop_counter. Pin the absence so a regression
+ // that re-introduces operator-side counter handling is caught.
val code = desc().generatePythonCode()
- code should include("loop_counter = int(state.get(\"loop_counter\", 0))")
- code should include("if loop_counter > 0:")
- code should include("state[\"loop_counter\"] = loop_counter - 1")
+ code should not include "loop_counter"
+ code should include("self.state = dict(state)")
}
it should "stash state, deserialize the pickled table, and run the decoded
update on the matching-loop branch" in {
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/loop/LoopStartOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/loop/LoopStartOpDescSpec.scala
index 7fb9e41cf2..67f8446541 100644
---
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/loop/LoopStartOpDescSpec.scala
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/loop/LoopStartOpDescSpec.scala
@@ -71,7 +71,10 @@ class LoopStartOpDescSpec extends AnyFlatSpec with
LoopOpDescSpecMixin {
// nothing.
val code = desc(init = "i = 0", out = "table.iloc[i]").generatePythonCode()
code should include("def open(self)")
- code should include("\"loop_counter\": 0")
+ // loop_counter is runtime-owned now and must not be seeded into the
+ // operator's state; open() starts from an empty dict.
+ code should include("self.state = {}")
+ code should not include "loop_counter"
code should include(s"exec(${decodeExpr("i = 0")}, {}, self.state)")
code should include("def process_table(self, table: Table, port: int)")
code should include(s"""exec("output = " + ${decodeExpr("table.iloc[i]")},
{}, self.state)""")