This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
     new 538a821fef fix fmt
538a821fef is described below

commit 538a821fef582efefbf143c66434febc1aa9d20f
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu May 14 23:57:20 2026 -0700

    fix fmt
---
 .../input_port_materialization_reader_runnable.py      | 18 +++++-------------
 1 file changed, 5 insertions(+), 13 deletions(-)

diff --git 
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
 
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
index e3ee27e415..8a7e426a04 100644
--- 
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
+++ 
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
@@ -148,19 +148,11 @@ class InputPortMaterializationReaderRunnable(Runnable, 
Stoppable):
             )
             self.emit_ecm("StartChannel", 
EmbeddedControlMessageType.NO_ALIGNMENT)
 
-            # State is broadcast to every downstream worker (no partitioner
-            # filtering, unlike the tuple loop) -- per the design comment
-            # above. Loop-specific: guard with try/except since the state
-            # document may not be provisioned on every materialization in
-            # this branch (the LoopEnd path open-or-creates it).
-            try:
-                state_document, _ = DocumentFactory.open_document(
-                    VFSURIFactory.state_uri(self.uri)
-                )
-                for state_row in state_document.get():
-                    self.emit_payload(StateFrame(State.from_tuple(state_row)))
-            except ValueError:
-                pass
+            state_document, _ = DocumentFactory.open_document(
+                VFSURIFactory.state_uri(self.uri)
+            )
+            for state_row in state_document.get():
+                self.emit_payload(StateFrame(State.from_tuple(state_row)))
 
             storage_iterator = self.materialization.get()
             # Iterate and process tuples.

Reply via email to