This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-state-materialization
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 3c0c8164e7657234820a6798a42ebaa30d9f6abf
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Apr 23 00:57:09 2026 -0700

    feat: add state materialization support
---
 .../core/architecture/packaging/output_manager.py  |  36 ++++++-
 amber/src/main/python/core/models/state.py         |   4 +
 .../main/python/core/storage/document_factory.py   | 107 ++++++++++++---------
 .../input_port_materialization_reader_runnable.py  |  29 +++++-
 .../main/python/core/storage/vfs_uri_factory.py    |   1 +
 .../messaginglayer/OutputManager.scala             |  20 ++++
 .../scheduling/RegionExecutionCoordinator.scala    |  58 +++++++++--
 .../engine/architecture/worker/DataProcessor.scala |   1 +
 .../InputPortMaterializationReaderThread.scala     |  26 ++++-
 .../org/apache/texera/amber/core/state/State.scala |   4 +
 .../amber/core/storage/DocumentFactory.scala       |   2 +
 .../texera/amber/core/storage/VFSURIFactory.scala  |   1 +
 12 files changed, 226 insertions(+), 63 deletions(-)

diff --git 
a/amber/src/main/python/core/architecture/packaging/output_manager.py 
b/amber/src/main/python/core/architecture/packaging/output_manager.py
index afa9127fe6..065b063f7d 100644
--- a/amber/src/main/python/core/architecture/packaging/output_manager.py
+++ b/amber/src/main/python/core/architecture/packaging/output_manager.py
@@ -17,6 +17,7 @@
 
 import threading
 import typing
+import uuid
 from collections import OrderedDict
 from itertools import chain
 from loguru import logger
@@ -43,7 +44,12 @@ from core.architecture.sendsemantics.round_robin_partitioner 
import (
 )
 from core.models import Tuple, Schema, StateFrame
 from core.models.payload import DataPayload, DataFrame
-from core.models.state import State
+from core.models.state import (
+    State,
+    STATE_SCHEMA,
+    serialize_state,
+    state_uri_from_result_uri,
+)
 from core.storage.document_factory import DocumentFactory
 from core.storage.runnables.port_storage_writer import (
     PortStorageWriter,
@@ -87,6 +93,8 @@ class OutputManager:
             PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread]
         ] = dict()
 
+        self._storage_uris: typing.Dict[PortIdentity, str] = dict()
+
     def is_missing_output_ports(self):
         """
         This method is only used for ensuring correct region execution.
@@ -126,6 +134,7 @@ class OutputManager:
         Create a separate thread for saving output tuples of a port
         to storage in batch.
         """
+        self._storage_uris[port_id] = storage_uri
         document, _ = DocumentFactory.open_document(storage_uri)
         buffered_item_writer = 
document.writer(str(get_worker_index(self.worker_id)))
         writer_queue = Queue()
@@ -171,6 +180,31 @@ class OutputManager:
                 PortStorageWriterElement(data_tuple=tuple_)
             )
 
+    def save_state_to_storage_if_needed(self, state: State, port_id=None) -> 
None:
+        if port_id is None:
+            uris = self._storage_uris.values()
+        elif port_id in self._storage_uris:
+            uris = [self._storage_uris[port_id]]
+        else:
+            return
+
+        for uri in uris:
+            state_uri = state_uri_from_result_uri(uri)
+            try:
+                document = DocumentFactory.open_document(state_uri)[0]
+            except ValueError:
+                document = DocumentFactory.create_document(state_uri, 
STATE_SCHEMA)
+            writer = document.writer(str(uuid.uuid4()))
+            writer.put_one(serialize_state(state))
+            writer.close()
+
+    def reset_output_storage(self) -> None:
+        port_id = self.get_port_ids()[0]
+        storage_uri = self._storage_uris[port_id]
+        self.close_port_storage_writers()
+        DocumentFactory.create_document(storage_uri, 
self._ports[port_id].get_schema())
+        self.set_up_port_storage_writer(port_id, storage_uri)
+
     def close_port_storage_writers(self) -> None:
         """
         Flush the buffers of port storage writers and wait for all the
diff --git a/amber/src/main/python/core/models/state.py 
b/amber/src/main/python/core/models/state.py
index a496d5c41c..e5726cc3c2 100644
--- a/amber/src/main/python/core/models/state.py
+++ b/amber/src/main/python/core/models/state.py
@@ -32,6 +32,10 @@ _BYTES_TYPE = "bytes"
 STATE_SCHEMA = Schema(raw_schema={STATE_CONTENT: "STRING"})
 
 
+def state_uri_from_result_uri(result_uri: str) -> str:
+    return result_uri.replace("/result", "/state")
+
+
 def serialize_state(state: State) -> Tuple:
     return Tuple(
         {
diff --git a/amber/src/main/python/core/storage/document_factory.py 
b/amber/src/main/python/core/storage/document_factory.py
index 9b686ab66b..8a4d6fe3c5 100644
--- a/amber/src/main/python/core/storage/document_factory.py
+++ b/amber/src/main/python/core/storage/document_factory.py
@@ -61,30 +61,35 @@ class DocumentFactory:
         if parsed_uri.scheme == VFSURIFactory.VFS_FILE_URI_SCHEME:
             _, _, _, resource_type = VFSURIFactory.decode_uri(uri)
 
-            if resource_type in {VFSResourceType.RESULT}:
-                storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
-
-                # Convert Amber Schema to Iceberg Schema with LARGE_BINARY
-                # field name encoding
-                iceberg_schema = amber_schema_to_iceberg_schema(schema)
-
-                create_table(
-                    IcebergCatalogInstance.get_instance(),
-                    StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
-                    storage_key,
-                    iceberg_schema,
-                    override_if_exists=True,
-                )
-
-                return IcebergDocument[Tuple](
-                    StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
-                    storage_key,
-                    iceberg_schema,
-                    amber_tuples_to_arrow_table,
-                    arrow_table_to_amber_tuples,
-                )
-            else:
-                raise ValueError(f"Resource type {resource_type} is not 
supported")
+            match resource_type:
+                case VFSResourceType.RESULT:
+                    namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
+                case VFSResourceType.STATE:
+                    namespace = "state"
+                case _:
+                    raise ValueError(f"Resource type {resource_type} is not 
supported")
+
+            storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
+            # Convert Amber Schema to Iceberg Schema with LARGE_BINARY
+            # field name encoding
+            iceberg_schema = amber_schema_to_iceberg_schema(schema)
+
+            create_table(
+                IcebergCatalogInstance.get_instance(),
+                namespace,
+                storage_key,
+                iceberg_schema,
+                override_if_exists=True,
+            )
+
+            return IcebergDocument[Tuple](
+                namespace,
+                storage_key,
+                iceberg_schema,
+                amber_tuples_to_arrow_table,
+                arrow_table_to_amber_tuples,
+            )
+
         else:
             raise NotImplementedError(
                 f"Unsupported URI scheme: {parsed_uri.scheme} for creating the 
document"
@@ -96,30 +101,36 @@ class DocumentFactory:
         if parsed_uri.scheme == "vfs":
             _, _, _, resource_type = VFSURIFactory.decode_uri(uri)
 
-            if resource_type in {VFSResourceType.RESULT}:
-                storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
-
-                table = load_table_metadata(
-                    IcebergCatalogInstance.get_instance(),
-                    StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
-                    storage_key,
-                )
-
-                if table is None:
-                    raise ValueError("No storage is found for the given URI")
-
-                amber_schema = Schema(table.schema().as_arrow())
-
-                document = IcebergDocument(
-                    StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
-                    storage_key,
-                    table.schema(),
-                    amber_tuples_to_arrow_table,
-                    arrow_table_to_amber_tuples,
-                )
-                return document, amber_schema
-            else:
-                raise ValueError(f"Resource type {resource_type} is not 
supported")
+            match resource_type:
+                case VFSResourceType.RESULT:
+                    namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
+                case VFSResourceType.STATE:
+                    namespace = "state"
+                case _:
+                    raise ValueError(f"Resource type {resource_type} is not 
supported")
+
+            storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
+
+            table = load_table_metadata(
+                IcebergCatalogInstance.get_instance(),
+                namespace,
+                storage_key,
+            )
+
+            if table is None:
+                raise ValueError("No storage is found for the given URI")
+
+            amber_schema = Schema(table.schema().as_arrow())
+
+            document = IcebergDocument(
+                namespace,
+                storage_key,
+                table.schema(),
+                amber_tuples_to_arrow_table,
+                arrow_table_to_amber_tuples,
+            )
+            return document, amber_schema
+
         else:
             raise NotImplementedError(
                 f"Unsupported URI scheme: {parsed_uri.scheme} for opening the 
document"
diff --git 
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
 
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
index e49c0316cc..a600f87857 100644
--- 
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
+++ 
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
@@ -17,8 +17,8 @@
 
 import typing
 from loguru import logger
-from pyarrow import Table
 from typing import Union
+from pyarrow import Table
 
 from core.architecture.sendsemantics.broad_cast_partitioner import (
     BroadcastPartitioner,
@@ -34,8 +34,9 @@ from 
core.architecture.sendsemantics.range_based_shuffle_partitioner import (
 from core.architecture.sendsemantics.round_robin_partitioner import (
     RoundRobinPartitioner,
 )
-from core.models import Tuple, InternalQueue, DataFrame, DataPayload
+from core.models import Tuple, InternalQueue, DataFrame, DataPayload, State, 
StateFrame
 from core.models.internal_queue import DataElement, ECMElement
+from core.models.state import deserialize_state, state_uri_from_result_uri
 from core.storage.document_factory import DocumentFactory
 from core.util import Stoppable, get_one_of
 from core.util.runnable.runnable import Runnable
@@ -125,6 +126,15 @@ class InputPortMaterializationReaderRunnable(Runnable, 
Stoppable):
             if receiver == self.worker_actor_id:
                 yield self.tuples_to_data_frame(tuples)
 
+    def emit_state_with_filter(self, state: State) -> 
typing.Iterator[StateFrame]:
+        for receiver, payload in self.partitioner.flush_state(state):
+            if receiver == self.worker_actor_id:
+                yield (
+                    StateFrame(payload)
+                    if isinstance(payload, dict)
+                    else self.tuples_to_data_frame(payload)
+                )
+
     def run(self) -> None:
         """
         Main execution logic that reads tuples from the materialized storage 
and
@@ -138,8 +148,21 @@ class InputPortMaterializationReaderRunnable(Runnable, 
Stoppable):
                 self.uri
             )
             self.emit_ecm("StartChannel", 
EmbeddedControlMessageType.NO_ALIGNMENT)
-            storage_iterator = self.materialization.get()
 
+            try:
+                state_document, _ = DocumentFactory.open_document(
+                    state_uri_from_result_uri(self.uri)
+                )
+                state_iterator = state_document.get()
+                for state in state_iterator:
+                    for state_frame in self.emit_state_with_filter(
+                        deserialize_state(state)
+                    ):
+                        self.emit_payload(state_frame)
+            except ValueError:
+                pass
+
+            storage_iterator = self.materialization.get()
             # Iterate and process tuples.
             for tup in storage_iterator:
                 if self._stopped:
diff --git a/amber/src/main/python/core/storage/vfs_uri_factory.py 
b/amber/src/main/python/core/storage/vfs_uri_factory.py
index de0c5db56e..0e23e60705 100644
--- a/amber/src/main/python/core/storage/vfs_uri_factory.py
+++ b/amber/src/main/python/core/storage/vfs_uri_factory.py
@@ -34,6 +34,7 @@ class VFSResourceType(str, Enum):
     RESULT = "result"
     RUNTIME_STATISTICS = "runtimeStatistics"
     CONSOLE_MESSAGES = "consoleMessages"
+    STATE = "state"
 
 
 class VFSURIFactory:
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
index 4ab3d18056..53755b780c 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
@@ -124,6 +124,8 @@ class OutputManager(
       : mutable.HashMap[PortIdentity, OutputPortResultWriterThread] =
     mutable.HashMap()
 
+  private val storageUris: mutable.HashMap[Int, URI] = mutable.HashMap()
+
   /**
     * Add down stream operator and its corresponding Partitioner.
     *
@@ -232,6 +234,23 @@ class OutputManager(
     })
   }
 
+  def saveStateToStorageIfNeeded(state: State): Unit = {
+    try {
+      storageUris.foreach {
+        case (_, uri) =>
+          val writer = DocumentFactory
+            .openDocument(State.stateUriFromResultUri(uri))
+            ._1
+            .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
+            .asInstanceOf[BufferedItemWriter[Tuple]]
+          writer.putOne(State.serialize(state))
+          writer.close()
+      }
+    } catch {
+      case _: Exception => ()
+    }
+  }
+
   /**
     * Singal the port storage writer to flush the remaining buffer and wait 
for commits to finish so that
     * the output port is properly completed. If the output port does not need 
storage, no action will be done.
@@ -280,6 +299,7 @@ class OutputManager(
   }
 
   private def setupOutputStorageWriterThread(portId: PortIdentity, storageUri: 
URI): Unit = {
+    this.storageUris(portId.id) = storageUri
     val bufferedItemWriter = DocumentFactory
       .openDocument(storageUri)
       ._1
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index e490cde3d9..5be5d942e5 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -20,7 +20,8 @@
 package org.apache.texera.amber.engine.architecture.scheduling
 
 import org.apache.pekko.pattern.gracefulStop
-import com.twitter.util.{Future, Return, Throw}
+import com.twitter.util.{Duration => TwitterDuration, Future, JavaTimer, 
Return, Throw, Timer}
+import org.apache.texera.amber.core.state.State
 import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI
 import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
@@ -61,7 +62,7 @@ import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutions
 
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicReference
-import scala.concurrent.duration.Duration
+import scala.concurrent.duration.{Duration => ScalaDuration}
 
 /**
   * The executor of a region.
@@ -109,10 +110,14 @@ class RegionExecutionCoordinator(
   private val currentPhaseRef: AtomicReference[RegionExecutionPhase] = new 
AtomicReference(
     Unexecuted
   )
+  private val terminationFutureRef: AtomicReference[Future[Unit]] = new 
AtomicReference(null)
+  private val killRetryTimer: Timer = new JavaTimer(true)
+  private val killRetryDelay: TwitterDuration = 
TwitterDuration.fromMilliseconds(200)
 
   /**
     * Sync the status of `RegionExecution` and transition this coordinator's 
phase to `Completed` only when the
-    * coordinator is currently in `ExecutingNonDependeePortsPhase` and all the 
ports of this region are completed.
+    * coordinator is currently in `ExecutingNonDependeePortsPhase`, all the 
ports of this region are completed, and
+    * all workers in this region are terminated.
     *
     * Additionally, this method will also terminate all the workers of this 
region:
     *
@@ -135,12 +140,22 @@ class RegionExecutionCoordinator(
       return Future.Unit
     }
 
-    // Set this coordinator's status to be completed so that subsequent 
regions can be started by
-    // WorkflowExecutionCoordinator.
-    setPhase(Completed)
-
-    // Terminate all the workers in this region.
-    terminateWorkers(regionExecution)
+    val existingTerminationFuture = terminationFutureRef.get
+    if (existingTerminationFuture != null) {
+      existingTerminationFuture
+    } else {
+      val terminationFuture = 
terminateWorkersWithRetry(regionExecution).flatMap { _ =>
+        // Set this coordinator's status to be completed so that subsequent 
regions can be started by
+        // WorkflowExecutionCoordinator.
+        setPhase(Completed)
+        Future.Unit
+      }
+      if (terminationFutureRef.compareAndSet(null, terminationFuture)) {
+        terminationFuture
+      } else {
+        terminationFutureRef.get
+      }
+    }
   }
 
   private def terminateWorkers(regionExecution: RegionExecution) = {
@@ -167,7 +182,7 @@ class RegionExecutionCoordinator(
                 val actorRef = actorRefService.getActorRef(workerId)
                 // Remove the actorRef so that no other actors can find the 
worker and send messages.
                 actorRefService.removeActorRef(workerId)
-                gracefulStop(actorRef, Duration(5, 
TimeUnit.SECONDS)).asTwitter()
+                gracefulStop(actorRef, ScalaDuration(5, 
TimeUnit.SECONDS)).asTwitter()
               }
           }.toSeq
 
@@ -191,8 +206,29 @@ class RegionExecutionCoordinator(
     }
   }
 
+  private def terminateWorkersWithRetry(
+      regionExecution: RegionExecution,
+      attempt: Int = 1
+  ): Future[Unit] = {
+    terminateWorkers(regionExecution).rescue { case err =>
+      logger.warn(
+        s"Failed to terminate region ${region.id.id} on attempt $attempt. 
Retrying in ${killRetryDelay.inMilliseconds} ms.",
+        err
+      )
+      Future
+        .sleep(killRetryDelay)(killRetryTimer)
+        .flatMap(_ => terminateWorkersWithRetry(regionExecution, attempt + 1))
+    }
+  }
+
   def isCompleted: Boolean = currentPhaseRef.get == Completed
 
+  /**
+    * Returns the region termination future if termination has been initiated.
+    * This is only set by `tryCompleteRegionExecution()`.
+    */
+  def getTerminationFutureOpt: Option[Future[Unit]] = 
Option(terminationFutureRef.get)
+
   /**
     * This will sync and transition the region execution phase from one to 
another depending on its current phase:
     *
@@ -528,12 +564,14 @@ class RegionExecutionCoordinator(
     portConfigs.foreach {
       case (outputPortId, portConfig) =>
         val storageUriToAdd = portConfig.storageURI
+        val stateUriToAdd = State.stateUriFromResultUri(storageUriToAdd)
         val (_, eid, _, _) = decodeURI(storageUriToAdd)
         val schemaOptional =
           
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
         val schema =
           schemaOptional.getOrElse(throw new IllegalStateException("Schema is 
missing"))
         DocumentFactory.createDocument(storageUriToAdd, schema)
+        DocumentFactory.createDocument(stateUriToAdd, State.schema)
         WorkflowExecutionsResource.insertOperatorPortResultUri(
           eid = eid,
           globalPortId = outputPortId,
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
index 3aa5fa90a4..65c560ee59 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
@@ -126,6 +126,7 @@ class DataProcessor(
       val outputState = executor.processState(state, port)
       if (outputState.isDefined) {
         outputManager.emitState(outputState.get)
+        outputManager.saveStateToStorageIfNeeded(state)
       }
     } catch safely {
       case e =>
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
index 10fbbc44a2..acada743bc 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
@@ -21,6 +21,7 @@ package 
org.apache.texera.amber.engine.architecture.worker.managers
 
 import io.grpc.MethodDescriptor
 import org.apache.texera.amber.config.ApplicationConfig
+import org.apache.texera.amber.core.state.State
 import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.storage.model.VirtualDocument
 import org.apache.texera.amber.core.tuple.Tuple
@@ -45,7 +46,11 @@ import 
org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.{
   DPInputQueueElement,
   FIFOMessageElement
 }
-import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, 
WorkflowFIFOMessage}
+import org.apache.texera.amber.engine.common.ambermessage.{
+  DataFrame,
+  StateFrame,
+  WorkflowFIFOMessage
+}
 import 
org.apache.texera.amber.util.VirtualIdentityUtils.getFromActorIdForInputPortStorage
 
 import java.net.URI
@@ -106,6 +111,25 @@ class InputPortMaterializationReaderThread(
       }
       // Flush any remaining tuples in the buffer.
       if (buffer.nonEmpty) flush()
+
+      try {
+        val state_document =
+          DocumentFactory
+            .openDocument(State.stateUriFromResultUri(uri))
+            ._1
+            .asInstanceOf[VirtualDocument[Tuple]]
+        val stateReadIterator = state_document.get()
+
+        while (stateReadIterator.hasNext) {
+          val state = State.deserialize(stateReadIterator.next())
+          inputMessageQueue.put(
+            FIFOMessageElement(WorkflowFIFOMessage(channelId, 
getSequenceNumber, StateFrame(state)))
+          )
+        }
+      } catch {
+        case _: Exception =>
+      }
+
       emitECM(METHOD_END_CHANNEL, PORT_ALIGNMENT)
       isFinished.set(true)
     } catch {
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
index 4957f31a40..f76a314b7a 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonNode
 import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, 
Tuple}
 import org.apache.texera.amber.util.JSONUtils.objectMapper
 
+import java.net.URI
 import java.util.Base64
 import scala.jdk.CollectionConverters.IteratorHasAsScala
 
@@ -36,6 +37,9 @@ object State {
     new Attribute(StateContent, AttributeType.STRING)
   )
 
+  def stateUriFromResultUri(resultUri: URI): URI =
+    new URI(resultUri.toString.replace("/result", "/state"))
+
   def serialize(state: State): Tuple = {
     val payloadJson = objectMapper.writeValueAsString(toJsonValue(state))
     Tuple.builder(schema).addSequentially(Array(payloadJson)).build()
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
index 15949ef471..ae37def667 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
@@ -72,6 +72,7 @@ object DocumentFactory {
           case RESULT             => StorageConfig.icebergTableResultNamespace
           case CONSOLE_MESSAGES   => 
StorageConfig.icebergTableConsoleMessagesNamespace
           case RUNTIME_STATISTICS => 
StorageConfig.icebergTableRuntimeStatisticsNamespace
+          case STATE              => "state"
           case _ =>
             throw new IllegalArgumentException(s"Resource type $resourceType 
is not supported")
         }
@@ -119,6 +120,7 @@ object DocumentFactory {
           case RESULT             => StorageConfig.icebergTableResultNamespace
           case CONSOLE_MESSAGES   => 
StorageConfig.icebergTableConsoleMessagesNamespace
           case RUNTIME_STATISTICS => 
StorageConfig.icebergTableRuntimeStatisticsNamespace
+          case STATE              => "state"
           case _ =>
             throw new IllegalArgumentException(s"Resource type $resourceType 
is not supported")
         }
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
index 3513ac5ecd..990776a69f 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
@@ -34,6 +34,7 @@ object VFSResourceType extends Enumeration {
   val RESULT: Value = Value("result")
   val RUNTIME_STATISTICS: Value = Value("runtimeStatistics")
   val CONSOLE_MESSAGES: Value = Value("consoleMessages")
+  val STATE: Value = Value("state")
 }
 
 object VFSURIFactory {

Reply via email to