Copilot commented on code in PR #4441:
URL: https://github.com/apache/texera/pull/4441#discussion_r3121679805


##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala:
##########
@@ -106,6 +111,25 @@ class InputPortMaterializationReaderThread(
       }
       // Flush any remaining tuples in the buffer.
       if (buffer.nonEmpty) flush()
+
+      try {
+        val state_document =
+          DocumentFactory
+            .openDocument(State.stateUriFromResultUri(uri))
+            ._1
+            .asInstanceOf[VirtualDocument[Tuple]]
+        val stateReadIterator = state_document.get()

Review Comment:
   The local val name `state_document` doesn’t follow the surrounding Scala 
naming convention (camelCase). Rename it to `stateDocument` (and similarly for 
`stateReadIterator`) to keep the codebase consistent.
   ```suggestion
           val stateDocument =
             DocumentFactory
               .openDocument(State.stateUriFromResultUri(uri))
               ._1
               .asInstanceOf[VirtualDocument[Tuple]]
           val stateReadIterator = stateDocument.get()
   ```



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala:
##########
@@ -86,4 +86,8 @@ class NetworkInputGateway(val actorId: ActorVirtualIdentity)
     enforcers += enforcer
   }
 
+  def removeControlChannel(from: ActorVirtualIdentity): Unit = {
+    inputChannels.remove(ChannelIdentity(from, actorId, isControl = true))

Review Comment:
   removeControlChannel mutates the underlying mutable HashMap without any 
synchronization, while getChannel uses `synchronized` for updates and other 
methods iterate over the map. If removeControlChannel can run concurrently with 
DP/main threads, this can cause race conditions or HashMap corruption. Consider 
synchronizing this removal (or switching to a concurrent map) consistent with 
getChannel’s locking strategy.
   ```suggestion
       synchronized {
         inputChannels.remove(ChannelIdentity(from, actorId, isControl = true))
       }
   ```



##########
common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala:
##########
@@ -19,39 +19,70 @@
 
 package org.apache.texera.amber.core.state
 
+import com.fasterxml.jackson.databind.JsonNode
 import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, 
Tuple}
+import org.apache.texera.amber.util.JSONUtils.objectMapper
 
-import scala.collection.mutable
+import java.net.URI
+import java.util.Base64
+import scala.jdk.CollectionConverters.IteratorHasAsScala
 
-final case class State(tuple: Option[Tuple] = None, passToAllDownstream: 
Boolean = false) {
-  val data: mutable.Map[String, (AttributeType, Any)] = mutable.LinkedHashMap()
-  add("passToAllDownstream", passToAllDownstream, AttributeType.BOOLEAN)
-  if (tuple.isDefined) {
-    tuple.get.getSchema.getAttributes.foreach { attribute =>
-      add(attribute.getName, tuple.get.getField(attribute.getName), 
attribute.getType)
-    }
-  }
+object State {
+  private val StateContent = "content"
+  private val BytesTypeMarker = "__texera_type__"
+  private val BytesValue = "bytes"
+  private val PayloadMarker = "payload"
 
-  def add(key: String, value: Any, valueType: AttributeType): Unit =
-    data.put(key, (valueType, value))
+  val schema: Schema = new Schema(
+    new Attribute(StateContent, AttributeType.STRING)
+  )
 
-  def get(key: String): Any = data(key)._2
+  def stateUriFromResultUri(resultUri: URI): URI =
+    new URI(resultUri.toString.replace("/result", "/state"))

Review Comment:
   stateUriFromResultUri uses a global string replace on the full URI 
("/result" -> "/state"). This can produce incorrect URIs if "/result" appears 
outside the resource-type suffix (e.g., in encoded globalPortId), and it 
doesn’t enforce that the URI actually ends with the RESULT resource type. 
Prefer constructing the STATE URI by decoding the VFS URI components and 
rebuilding it (or at least replacing only the final path segment).
   ```suggestion
     def stateUriFromResultUri(resultUri: URI): URI = {
       val resultSuffix = "/result"
       val stateSuffix = "/state"
       val rawPath = Option(resultUri.getRawPath).getOrElse("")
       require(
         rawPath.endsWith(resultSuffix),
         s"Expected result URI path to end with $resultSuffix: $resultUri"
       )
       val statePath = rawPath.substring(0, rawPath.length - 
resultSuffix.length) + stateSuffix
       new URI(
         resultUri.getScheme,
         resultUri.getRawAuthority,
         statePath,
         resultUri.getRawQuery,
         resultUri.getRawFragment
       )
     }
   ```



##########
amber/src/main/python/core/architecture/packaging/output_manager.py:
##########
@@ -171,6 +180,31 @@ def save_tuple_to_storage_if_needed(self, tuple_: Tuple, 
port_id=None) -> None:
                 PortStorageWriterElement(data_tuple=tuple_)
             )
 
+    def save_state_to_storage_if_needed(self, state: State, port_id=None) -> 
None:
+        if port_id is None:
+            uris = self._storage_uris.values()
+        elif port_id in self._storage_uris:
+            uris = [self._storage_uris[port_id]]
+        else:
+            return
+
+        for uri in uris:
+            state_uri = state_uri_from_result_uri(uri)
+            try:
+                document = DocumentFactory.open_document(state_uri)[0]
+            except ValueError:
+                document = DocumentFactory.create_document(state_uri, 
STATE_SCHEMA)
+            writer = document.writer(str(uuid.uuid4()))
+            writer.put_one(serialize_state(state))
+            writer.close()

Review Comment:
   save_state_to_storage_if_needed writes each state using a brand-new random 
writer id (uuid4) and closes immediately. This will likely create many tiny 
Iceberg snapshots/files and can severely impact performance and storage costs 
for frequent state emission. Consider reusing a stable writer id per worker 
(and buffering/batching writes) similar to tuple storage writers.



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala:
##########
@@ -94,4 +94,8 @@ class NetworkOutputGateway(
     idToSequenceNums.getOrElseUpdate(channelId, new 
AtomicLong()).getAndIncrement()
   }
 
+  def removeControlChannel(to: ActorVirtualIdentity): Unit = {
+    idToSequenceNums.remove(ChannelIdentity(actorId, to, isControl = true))
+  }

Review Comment:
   removeControlChannel mutates `idToSequenceNums` (a mutable.HashMap) without 
synchronization, but sequence numbers are also updated via getOrElseUpdate 
during sends. If gateway sends/removals can happen concurrently, this risks 
races. Align thread-safety with how this gateway is used (e.g., synchronize 
removal and sequence-number access, or use a concurrent map).



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala:
##########
@@ -232,6 +234,23 @@ class OutputManager(
     })
   }
 
+  def saveStateToStorageIfNeeded(state: State): Unit = {
+    try {
+      storageUris.foreach {
+        case (_, uri) =>
+          val writer = DocumentFactory
+            .openDocument(State.stateUriFromResultUri(uri))
+            ._1
+            .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
+            .asInstanceOf[BufferedItemWriter[Tuple]]
+          writer.putOne(State.serialize(state))
+          writer.close()
+      }
+    } catch {
+      case _: Exception => ()
+    }

Review Comment:
   saveStateToStorageIfNeeded catches and suppresses all Exceptions, which can 
silently drop persisted state and make restart/debugging very hard. At minimum, 
narrow the catch to expected cases (e.g., missing state document) and log 
unexpected failures (or propagate) so storage issues aren’t hidden.



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala:
##########
@@ -106,6 +111,25 @@ class InputPortMaterializationReaderThread(
       }
       // Flush any remaining tuples in the buffer.
       if (buffer.nonEmpty) flush()
+
+      try {
+        val state_document =
+          DocumentFactory
+            .openDocument(State.stateUriFromResultUri(uri))
+            ._1
+            .asInstanceOf[VirtualDocument[Tuple]]
+        val stateReadIterator = state_document.get()
+
+        while (stateReadIterator.hasNext) {
+          val state = State.deserialize(stateReadIterator.next())
+          inputMessageQueue.put(
+            FIFOMessageElement(WorkflowFIFOMessage(channelId, 
getSequenceNumber, StateFrame(state)))
+          )
+        }
+      } catch {
+        case _: Exception =>
+      }

Review Comment:
   The try/catch around reading the state document swallows all Exceptions. 
This can hide real corruption/permission/IO issues and make region restart 
behavior non-deterministic. Prefer catching only the expected “document not 
found” case (or logging unexpected exceptions) instead of a blanket catch.



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala:
##########
@@ -232,6 +234,23 @@ class OutputManager(
     })
   }
 
+  def saveStateToStorageIfNeeded(state: State): Unit = {
+    try {
+      storageUris.foreach {
+        case (_, uri) =>
+          val writer = DocumentFactory
+            .openDocument(State.stateUriFromResultUri(uri))
+            ._1
+            .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
+            .asInstanceOf[BufferedItemWriter[Tuple]]
+          writer.putOne(State.serialize(state))
+          writer.close()
+      }

Review Comment:
   saveStateToStorageIfNeeded opens the document, writes a single tuple, and 
closes the writer for every state and every storage URI. This will likely 
generate a large number of tiny commits/files and can become a major 
bottleneck. Consider buffering states (similar to tuple storage) or reusing a 
long-lived writer per (port, worker) and flushing on port finalization.



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala:
##########
@@ -191,8 +208,29 @@ class RegionExecutionCoordinator(
     }
   }
 
+  private def terminateWorkersWithRetry(
+      regionExecution: RegionExecution,
+      attempt: Int = 1
+  ): Future[Unit] = {
+    terminateWorkers(regionExecution).rescue { case err =>
+      logger.warn(
+        s"Failed to terminate region ${region.id.id} on attempt $attempt. 
Retrying in ${killRetryDelay.inMilliseconds} ms.",
+        err
+      )
+      Future
+        .sleep(killRetryDelay)(killRetryTimer)
+        .flatMap(_ => terminateWorkersWithRetry(regionExecution, attempt + 1))

Review Comment:
   terminateWorkersWithRetry retries indefinitely with a fixed 200ms delay and 
never gives up. If termination keeps failing (e.g., actorRef lookup fails, gRPC 
hangs), this can loop forever and prevent workflow progress, while also keeping 
the JavaTimer alive. Add a max-attempts / total-time budget and surface a 
terminal failure (and consider exponential backoff).
   ```suggestion
     private val maxKillRetryAttempts = 10
     private val maxKillRetryElapsed: ScalaDuration = ScalaDuration(30, 
TimeUnit.SECONDS)
   
     private def terminationRetryDelay(attempt: Int): TwitterDuration = {
       val exponentialMultiplier = math.pow(2.0, math.max(0, attempt - 
1).toDouble)
       val delayMillis = math.min(
         (killRetryDelay.inMilliseconds * exponentialMultiplier).toLong,
         maxKillRetryElapsed.toMillis
       )
       TwitterDuration.fromMilliseconds(delayMillis)
     }
   
     private def terminateWorkersWithRetry(
         regionExecution: RegionExecution,
         attempt: Int = 1,
         startedAtNanos: Long = System.nanoTime()
     ): Future[Unit] = {
       terminateWorkers(regionExecution).rescue { case err =>
         val elapsed = ScalaDuration.fromNanos(System.nanoTime() - 
startedAtNanos)
         val attemptsExhausted = attempt >= maxKillRetryAttempts
         val timeExhausted = elapsed >= maxKillRetryElapsed
   
         if (attemptsExhausted || timeExhausted) {
           val failureMessage =
             s"Failed to terminate region ${region.id.id} after $attempt 
attempt(s) over ${elapsed.toMillis} ms. Giving up."
           logger.error(failureMessage, err)
           Future.exception(new RuntimeException(failureMessage, err))
         } else {
           val retryDelay = terminationRetryDelay(attempt)
           logger.warn(
             s"Failed to terminate region ${region.id.id} on attempt $attempt. 
Retrying in ${retryDelay.inMilliseconds} ms.",
             err
           )
           Future
             .sleep(retryDelay)(killRetryTimer)
             .flatMap(_ => terminateWorkersWithRetry(regionExecution, attempt + 
1, startedAtNanos))
         }
   ```



##########
amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py:
##########
@@ -125,6 +126,15 @@ def tuple_to_batch_with_filter(self, tuple_: Tuple) -> 
typing.Iterator[DataFrame
             if receiver == self.worker_actor_id:
                 yield self.tuples_to_data_frame(tuples)
 
+    def emit_state_with_filter(self, state: State) -> 
typing.Iterator[StateFrame]:
+        for receiver, payload in self.partitioner.flush_state(state):
+            if receiver == self.worker_actor_id:
+                yield (
+                    StateFrame(payload)
+                    if isinstance(payload, dict)
+                    else self.tuples_to_data_frame(payload)
+                )

Review Comment:
   emit_state_with_filter is annotated as returning Iterator[StateFrame], but 
it can yield either a StateFrame (dict payload) or a DataFrame (tuple-list 
payload). Update the return type annotation to match the actual yielded types 
(e.g., Iterator[DataPayload] / Union[StateFrame, DataFrame]) to avoid 
misleading typing and downstream misuse.



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala:
##########
@@ -126,6 +126,7 @@ class DataProcessor(
       val outputState = executor.processState(state, port)
       if (outputState.isDefined) {
         outputManager.emitState(outputState.get)
+        outputManager.saveStateToStorageIfNeeded(state)

Review Comment:
   In processInputState, the state persisted to storage is the *input* `state`, 
not the `outputState` that is actually emitted downstream. This will cause 
restarts/materialization replay to load stale/incorrect state when executors 
transform the state. Persist the same state object that `emitState` sends 
(i.e., the produced output state).
   ```suggestion
           val producedState = outputState.get
           outputManager.emitState(producedState)
           outputManager.saveStateToStorageIfNeeded(producedState)
   ```



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala:
##########
@@ -59,18 +62,19 @@ class WorkflowExecutionCoordinator(
     * After the syncs, if there are no running region(s), it will start new 
regions (if available).
     */
   def coordinateRegionExecutors(actorService: AkkaActorService): Future[Unit] 
= {
-    if (regionExecutionCoordinators.values.exists(!_.isCompleted)) {
-      // As this method is invoked by the completion of each port in a region, 
and regionExecutionCoordinator only
-      // lanuches each phase asynchronously, we need to let each current 
unfinished regionExecutionCoordinator
-      // sync its status and proceed with next phases if needed.
-      Future
-        .collect({
-          regionExecutionCoordinators.values
-            .filter(!_.isCompleted)
-            .map(_.syncStatusAndTransitionRegionExecutionPhase())
-            .toSeq
-        })
+    val unfinishedRegionCoordinators =
+      regionExecutionCoordinators.values.filter(!_.isCompleted).toSeq
+
+    // Trigger sync for each unfinished region.
+    
unfinishedRegionCoordinators.foreach(_.syncStatusAndTransitionRegionExecutionPhase())
+
+    // Wait only for region termination futures (kill path), then re-run 
coordination.
+    val terminationFutures = 
unfinishedRegionCoordinators.flatMap(_.getTerminationFutureOpt)
+    if (terminationFutures.nonEmpty) {

Review Comment:
   coordinateRegionExecutors now triggers 
syncStatusAndTransitionRegionExecutionPhase() via foreach but discards the 
returned Futures. This can drop failures (unhandled exceptions) and makes the 
coordinator’s returned Future not reflect the work it kicked off. Consider 
collecting/returning these Futures (or attaching error handling) so errors 
during phase transitions/termination aren’t silently ignored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to