This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-state-only
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-state-only by this
push:
new 808a5e7ca2 fix fmt
808a5e7ca2 is described below
commit 808a5e7ca2c6627db60bbf68920a5554fc2883f2
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Apr 19 15:51:47 2026 -0700
fix fmt
---
amber/src/main/python/core/models/operator.py | 32 ----------------------
.../main/python/core/runnables/data_processor.py | 1 -
amber/src/main/python/core/runnables/main_loop.py | 29 --------------------
3 files changed, 62 deletions(-)
diff --git a/amber/src/main/python/core/models/operator.py
b/amber/src/main/python/core/models/operator.py
index 5b9672988a..91c5b2cf27 100644
--- a/amber/src/main/python/core/models/operator.py
+++ b/amber/src/main/python/core/models/operator.py
@@ -291,35 +291,3 @@ class TableOperator(TupleOperatorV2):
time, or None.
"""
yield
-
-
-class LoopStartOperator(TableOperator):
- @overrides.final
- def process_state(self, state: State, port: int) -> Optional[State]:
- if "LoopStartStateURI" in state:
- state["loop_counter"] += 1
- return state
- self.state.update(state)
- return None
-
- @overrides.final
- def produce_state_on_finish(self, port: int) -> State:
- from pickle import dumps
-
- self.state["table"] =
dumps(Table(self._TableOperator__table_data[port]))
- return dict(self.state)
-
-
-class LoopEndOperator(TableOperator):
- @overrides.final
- def process_table(self, table: Table, port: int) ->
Iterator[Optional[TableLike]]:
- yield table
-
- @abstractmethod
- def condition(self) -> None:
- pass
-
- def loop_start_id(self) -> str:
- del self.state["table"]
- del self.state["output"]
- return self.state["LoopStartId"]
diff --git a/amber/src/main/python/core/runnables/data_processor.py
b/amber/src/main/python/core/runnables/data_processor.py
index 815e85a644..4399b1a3a2 100644
--- a/amber/src/main/python/core/runnables/data_processor.py
+++ b/amber/src/main/python/core/runnables/data_processor.py
@@ -100,7 +100,6 @@ class DataProcessor(Runnable, Stoppable):
self._context.worker_id,
self._context.console_message_manager.print_buf,
):
- self._switch_context()
self._set_output_state(executor.process_state(state, port_id))
except Exception as err:
diff --git a/amber/src/main/python/core/runnables/main_loop.py
b/amber/src/main/python/core/runnables/main_loop.py
index ece5cf8e10..e0104a755d 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -69,7 +69,6 @@ from proto.org.apache.texera.amber.engine.architecture.rpc
import (
EmbeddedControlMessage,
AsyncRpcContext,
ControlRequest,
- IterationCompletedRequest,
)
from proto.org.apache.texera.amber.engine.architecture.worker import (
WorkerState,
@@ -96,29 +95,6 @@ class MainLoop(StoppableQueueBlockingRunnable):
target=self.data_processor.run, daemon=True,
name="data_processor_thread"
).start()
- def _attach_loop_start_id(self, output_state: State) -> None:
- if "LoopStartId" in output_state:
- return
- output_state["LoopStartId"] = self.context.worker_id.split("-",
1)[1].rsplit(
- "-main-0", 1
- )[0]
- output_state["LoopStartStateURI"] = state_uri_from_result_uri(
- self.context.input_manager.get_input_state_result_uri()
- )
-
- def _next_iteration(
- self, executor: LoopEndOperator, controller_interface
- ) -> None:
- controller_interface.iteration_completed(
-
IterationCompletedRequest(OperatorIdentity(executor.loop_start_id()))
- )
- uri = executor.state["LoopStartStateURI"]
- del executor.state["LoopStartStateURI"]
- del executor.state["LoopStartId"]
- writer = DocumentFactory.create_document(uri, STATE_SCHEMA).writer("0")
- writer.put_one(serialize_state(executor.state))
- writer.close()
-
def complete(self) -> None:
"""
Complete the DataProcessor, marking state to COMPLETED, and notify the
@@ -223,10 +199,6 @@ class MainLoop(StoppableQueueBlockingRunnable):
output_state = self.context.state_processing_manager.get_output_state()
self._switch_context()
if output_state is not None:
- if isinstance(self.context.executor_manager.executor,
LoopEndOperator):
- self.context.output_manager.reset_output_storage()
- if isinstance(self.context.executor_manager.executor,
LoopStartOperator):
- self._attach_loop_start_id(output_state)
for to, batch in
self.context.output_manager.emit_state(output_state):
self._output_queue.put(
DataElement(
@@ -236,7 +208,6 @@ class MainLoop(StoppableQueueBlockingRunnable):
payload=batch,
)
)
-
self.context.output_manager.save_state_to_storage_if_needed(output_state)
def process_tuple_with_udf(self) -> Iterator[Optional[Tuple]]:
"""