This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-state-only
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-state-only by this 
push:
     new 808a5e7ca2 fix fmt
808a5e7ca2 is described below

commit 808a5e7ca2c6627db60bbf68920a5554fc2883f2
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Apr 19 15:51:47 2026 -0700

    fix fmt
---
 amber/src/main/python/core/models/operator.py      | 32 ----------------------
 .../main/python/core/runnables/data_processor.py   |  1 -
 amber/src/main/python/core/runnables/main_loop.py  | 29 --------------------
 3 files changed, 62 deletions(-)

diff --git a/amber/src/main/python/core/models/operator.py 
b/amber/src/main/python/core/models/operator.py
index 5b9672988a..91c5b2cf27 100644
--- a/amber/src/main/python/core/models/operator.py
+++ b/amber/src/main/python/core/models/operator.py
@@ -291,35 +291,3 @@ class TableOperator(TupleOperatorV2):
             time, or None.
         """
         yield
-
-
-class LoopStartOperator(TableOperator):
-    @overrides.final
-    def process_state(self, state: State, port: int) -> Optional[State]:
-        if "LoopStartStateURI" in state:
-            state["loop_counter"] += 1
-            return state
-        self.state.update(state)
-        return None
-
-    @overrides.final
-    def produce_state_on_finish(self, port: int) -> State:
-        from pickle import dumps
-
-        self.state["table"] = 
dumps(Table(self._TableOperator__table_data[port]))
-        return dict(self.state)
-
-
-class LoopEndOperator(TableOperator):
-    @overrides.final
-    def process_table(self, table: Table, port: int) -> 
Iterator[Optional[TableLike]]:
-        yield table
-
-    @abstractmethod
-    def condition(self) -> None:
-        pass
-
-    def loop_start_id(self) -> str:
-        del self.state["table"]
-        del self.state["output"]
-        return self.state["LoopStartId"]
diff --git a/amber/src/main/python/core/runnables/data_processor.py 
b/amber/src/main/python/core/runnables/data_processor.py
index 815e85a644..4399b1a3a2 100644
--- a/amber/src/main/python/core/runnables/data_processor.py
+++ b/amber/src/main/python/core/runnables/data_processor.py
@@ -100,7 +100,6 @@ class DataProcessor(Runnable, Stoppable):
                 self._context.worker_id,
                 self._context.console_message_manager.print_buf,
             ):
-                self._switch_context()
                 self._set_output_state(executor.process_state(state, port_id))
 
         except Exception as err:
diff --git a/amber/src/main/python/core/runnables/main_loop.py 
b/amber/src/main/python/core/runnables/main_loop.py
index ece5cf8e10..e0104a755d 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -69,7 +69,6 @@ from proto.org.apache.texera.amber.engine.architecture.rpc 
import (
     EmbeddedControlMessage,
     AsyncRpcContext,
     ControlRequest,
-    IterationCompletedRequest,
 )
 from proto.org.apache.texera.amber.engine.architecture.worker import (
     WorkerState,
@@ -96,29 +95,6 @@ class MainLoop(StoppableQueueBlockingRunnable):
             target=self.data_processor.run, daemon=True, 
name="data_processor_thread"
         ).start()
 
-    def _attach_loop_start_id(self, output_state: State) -> None:
-        if "LoopStartId" in output_state:
-            return
-        output_state["LoopStartId"] = self.context.worker_id.split("-", 
1)[1].rsplit(
-            "-main-0", 1
-        )[0]
-        output_state["LoopStartStateURI"] = state_uri_from_result_uri(
-            self.context.input_manager.get_input_state_result_uri()
-        )
-
-    def _next_iteration(
-        self, executor: LoopEndOperator, controller_interface
-    ) -> None:
-        controller_interface.iteration_completed(
-            
IterationCompletedRequest(OperatorIdentity(executor.loop_start_id()))
-        )
-        uri = executor.state["LoopStartStateURI"]
-        del executor.state["LoopStartStateURI"]
-        del executor.state["LoopStartId"]
-        writer = DocumentFactory.create_document(uri, STATE_SCHEMA).writer("0")
-        writer.put_one(serialize_state(executor.state))
-        writer.close()
-
     def complete(self) -> None:
         """
         Complete the DataProcessor, marking state to COMPLETED, and notify the
@@ -223,10 +199,6 @@ class MainLoop(StoppableQueueBlockingRunnable):
         output_state = self.context.state_processing_manager.get_output_state()
         self._switch_context()
         if output_state is not None:
-            if isinstance(self.context.executor_manager.executor, 
LoopEndOperator):
-                self.context.output_manager.reset_output_storage()
-            if isinstance(self.context.executor_manager.executor, 
LoopStartOperator):
-                self._attach_loop_start_id(output_state)
             for to, batch in 
self.context.output_manager.emit_state(output_state):
                 self._output_queue.put(
                     DataElement(
@@ -236,7 +208,6 @@ class MainLoop(StoppableQueueBlockingRunnable):
                         payload=batch,
                     )
                 )
-            
self.context.output_manager.save_state_to_storage_if_needed(output_state)
 
     def process_tuple_with_udf(self) -> Iterator[Optional[Tuple]]:
         """

Reply via email to