This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-state-materialization
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-state-materialization 
by this push:
     new df5d347e6b update
df5d347e6b is described below

commit df5d347e6b15055daf1f5c21ba72651d0d8d00b3
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri May 1 00:11:23 2026 -0700

    update
---
 .../architecture/messaginglayer/OutputManager.scala | 21 ++++++++++-----------
 1 file changed, 10 insertions(+), 11 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
index 2af1ccce2c..3b8caa4d67 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
@@ -124,7 +124,7 @@ class OutputManager(
       : mutable.HashMap[PortIdentity, OutputPortResultWriterThread] =
     mutable.HashMap()
 
-  private val storageUris: mutable.HashMap[Int, URI] = mutable.HashMap()
+  private val storageUris: mutable.ArrayBuffer[URI] = mutable.ArrayBuffer()
 
   /**
     * Add down stream operator and its corresponding Partitioner.
@@ -236,15 +236,14 @@ class OutputManager(
 
   def saveStateToStorageIfNeeded(state: State): Unit = {
     try {
-      storageUris.foreach {
-        case (_, uri) =>
-          val writer = DocumentFactory
-            .openDocument(State.uriFromResultUri(uri))
-            ._1
-            .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
-            .asInstanceOf[BufferedItemWriter[Tuple]]
-          writer.putOne(state.toTuple)
-          writer.close()
+      storageUris.foreach { uri =>
+        val writer = DocumentFactory
+          .openDocument(State.uriFromResultUri(uri))
+          ._1
+          .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
+          .asInstanceOf[BufferedItemWriter[Tuple]]
+        writer.putOne(state.toTuple)
+        writer.close()
       }
     } catch {
       case _: Exception => ()
@@ -299,7 +298,7 @@ class OutputManager(
   }
 
   private def setupOutputStorageWriterThread(portId: PortIdentity, storageUri: 
URI): Unit = {
-    this.storageUris(portId.id) = storageUri
+    this.storageUris += storageUri
     val bufferedItemWriter = DocumentFactory
       .openDocument(storageUri)
       ._1

Reply via email to