Xiao-zhen-Liu commented on code in PR #4490:
URL: https://github.com/apache/texera/pull/4490#discussion_r3204022664


##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala:
##########
@@ -251,7 +264,10 @@ class OutputManager(
         writerThread.getFailure.foreach(throw _)
       case None =>
     }
-
+    this.stateWriterThreads.remove(outputPortId).foreach { writerThread =>

Review Comment:
   The result-writer cleanup directly above this calls 
`writerThread.getFailure.foreach(throw _)` so an Iceberg commit failure 
surfaces as a FatalError to the controller (Recently done in #4683.) The 
state-writer cleanup uses the same `OutputPortResultWriterThread` but skips the 
`getFailure` check, so a state write failure here is silently swallowed and the 
worker still announces port completion as if state was durably written. Please 
mirror the failure-rethrow on the state side too.



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala:
##########
@@ -124,6 +124,9 @@ class OutputManager(
       : mutable.HashMap[PortIdentity, OutputPortResultWriterThread] =
     mutable.HashMap()
 
+  private val stateWriterThreads: mutable.HashMap[PortIdentity, 
OutputPortResultWriterThread] =

Review Comment:
   Rename `OutputPortResultWriterThread` to `OutputPortStorageWriterThread ` 
since it is not just used for result now?



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala:
##########
@@ -568,18 +569,21 @@ class RegionExecutionCoordinator(
   ): Unit = {
     portConfigs.foreach {
       case (outputPortId, portConfig) =>
-        val storageUriToAdd = portConfig.storageURI
-        val (_, eid, _, _) = decodeURI(storageUriToAdd)
+        val portBaseURI = portConfig.storageURI
+        val resultURI = VFSURIFactory.resultURI(portBaseURI)
+        val stateURI = VFSURIFactory.stateURI(portBaseURI)
         val schemaOptional =
           
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
         val schema =
           schemaOptional.getOrElse(throw new IllegalStateException("Schema is 
missing"))
-        DocumentFactory.createDocument(storageUriToAdd, schema)
+        DocumentFactory.createDocument(resultURI, schema)
+        DocumentFactory.createDocument(stateURI, State.schema)
         if (!isRestart) {
+          val (_, eid, _, _) = decodeURI(resultURI)
           WorkflowExecutionsResource.insertOperatorPortResultUri(
             eid = eid,
             globalPortId = outputPortId,
-            uri = storageUriToAdd
+            uri = resultURI
           )
         }
     }

Review Comment:
   Nit: The name `storageURI` is misleading now — it sounds like the URI you'd 
pass straight into `DocumentFactory.openDocument`, but it's actually the base 
that `resultURI` and `stateURI` derive from. Worth renaming the field on 
`OutputPortConfig` (and the matching parameter on `OutputManager.addPort`) to 
`storageURIBase` to signal it isn't the final URI.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to