This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-state-materialization
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-state-materialization
by this push:
new df5d347e6b update
df5d347e6b is described below
commit df5d347e6b15055daf1f5c21ba72651d0d8d00b3
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri May 1 00:11:23 2026 -0700
update
---
.../architecture/messaginglayer/OutputManager.scala | 21 ++++++++++-----------
1 file changed, 10 insertions(+), 11 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
index 2af1ccce2c..3b8caa4d67 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
@@ -124,7 +124,7 @@ class OutputManager(
: mutable.HashMap[PortIdentity, OutputPortResultWriterThread] =
mutable.HashMap()
- private val storageUris: mutable.HashMap[Int, URI] = mutable.HashMap()
+ private val storageUris: mutable.ArrayBuffer[URI] = mutable.ArrayBuffer()
/**
* Add down stream operator and its corresponding Partitioner.
@@ -236,15 +236,14 @@ class OutputManager(
def saveStateToStorageIfNeeded(state: State): Unit = {
try {
- storageUris.foreach {
- case (_, uri) =>
- val writer = DocumentFactory
- .openDocument(State.uriFromResultUri(uri))
- ._1
- .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
- .asInstanceOf[BufferedItemWriter[Tuple]]
- writer.putOne(state.toTuple)
- writer.close()
+ storageUris.foreach { uri =>
+ val writer = DocumentFactory
+ .openDocument(State.uriFromResultUri(uri))
+ ._1
+ .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
+ .asInstanceOf[BufferedItemWriter[Tuple]]
+ writer.putOne(state.toTuple)
+ writer.close()
}
} catch {
case _: Exception => ()
@@ -299,7 +298,7 @@ class OutputManager(
}
private def setupOutputStorageWriterThread(portId: PortIdentity, storageUri:
URI): Unit = {
- this.storageUris(portId.id) = storageUri
+ this.storageUris += storageUri
val bufferedItemWriter = DocumentFactory
.openDocument(storageUri)
._1