This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-5784-e270f830be7d11a59a8357eeecec2251d642c872
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 6433e713a08606eb952581828e8f9c360a763013
Author: Tanishq Gandhi <[email protected]>
AuthorDate: Tue Jun 23 12:33:44 2026 -0700

    fix(amber): surface real cause when output port schema is unavailable 
(#5784)
    
    ### What changes were proposed in this PR?
    
    When a workflow run fails because an output port's schema can't be
    resolved, the engine threw a generic `IllegalStateException("Schema is
    missing")`, discarding the actual cause. It typically happens when a
    dataset used by the workflow has not been shared with the user running
    it, but nothing in the message says so.
    
    Root cause: in
    `RegionExecutionCoordinator.createOutputPortStorageObjects`, the output
    port schema is an `Either[Throwable, Schema]`, but it was unwrapped with
    `Either.getOrElse(throw new IllegalStateException("Schema is
    missing"))`. getOrElse is right-biased, so on a `Left(cause)` it
    evaluates the default and throws away the real cause.
    
    ```
    Before:  Left(cause: "no access to dataset X")  --getOrElse(throw)-->  
"Schema is missing"
    After:   Left(cause)                            -->  
IllegalStateException("Failed to resolve the output schema: " + cause.msg, 
cause)
    ```
    
    The fix matches on the `Either` and, on `Left`, throws an
    `IllegalStateException` that keeps the original cause as the exception
    cause
    
    **Before / After** (the message surfaces in the workspace Result Panel →
    Static Error frame):
    
    | | Message |
    |---|---|
    | **Before** | Schema is missing |
    | **After** | Failed to resolve the output schema: <real cause> (e.g. …:
    User has no access to this dataset) |
    
    ### Any related issues, documentation, discussions?
    
    Closes #3546
    
    ### How was this PR tested?
    
    Added a regression test in `RegionExecutionCoordinatorSpec` that seeds
    an output port with a Left(cause) schema and asserts the coordinator
    throws an IllegalStateException whose getCause is the original cause and
    whose message contains the underlying message. The existing
    positive-path tests in the same spec continue to pass.
    
    ```
    sbt "WorkflowExecutionService/testOnly 
org.apache.texera.amber.engine.architecture.scheduling.RegionExecutionCoordinatorSpec"
    ...
    [info] Total number of tests run: 3
    [info] Tests: succeeded 3, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Claude Opus 4.8)
---
 .../scheduling/RegionExecutionCoordinator.scala    | 16 +++-
 .../RegionExecutionCoordinatorSpec.scala           | 97 +++++++++++++++++++++-
 2 files changed, 109 insertions(+), 4 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 4497d7c4ae..aba6eff759 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -572,10 +572,20 @@ class RegionExecutionCoordinator(
         val portBaseURI = portConfig.storageURIBase
         val resultURI = VFSURIFactory.resultURI(portBaseURI)
         val stateURI = VFSURIFactory.stateURI(portBaseURI)
-        val schemaOptional =
-          
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
         val schema =
-          schemaOptional.getOrElse(throw new IllegalStateException("Schema is 
missing"))
+          
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3 match 
{
+            case Right(resolvedSchema) => resolvedSchema
+            case Left(cause)           =>
+              // The output port schema failed to resolve (e.g. a dataset the 
workflow reads is not
+              // shared with the running user, making its file and inferred 
schema unavailable).
+              // Surface the underlying cause instead of a generic "Schema is 
missing" (issue #3546).
+              val reason = Option(cause.getMessage).getOrElse(cause.toString)
+              logger.error(s"Output schema unavailable for port 
$outputPortId", cause)
+              throw new IllegalStateException(
+                s"Failed to resolve the output schema: $reason",
+                cause
+              )
+          }
         // An output port whose storage accumulates across region re-executions
         // (e.g. a LoopEnd port, whose output builds up over the iterations of
         // its own loop) sets `reuseStorage`. When set, the port's existing
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
index 9e6cb227e5..2c663612c4 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
@@ -22,13 +22,29 @@ package 
org.apache.texera.amber.engine.architecture.scheduling
 import com.twitter.util.Future
 import org.apache.pekko.actor.ActorSystem
 import org.apache.pekko.testkit.TestKit
+import org.apache.texera.amber.core.storage.VFSURIFactory
 import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, 
ChannelIdentity}
-import org.apache.texera.amber.core.workflow.PhysicalOp
+import org.apache.texera.amber.core.workflow.{
+  GlobalPortIdentity,
+  OutputPort,
+  PhysicalOp,
+  PortIdentity
+}
+import org.apache.texera.amber.core.workflow.WorkflowContext.{
+  DEFAULT_EXECUTION_ID,
+  DEFAULT_WORKFLOW_ID
+}
 import 
org.apache.texera.amber.engine.architecture.common.PekkoActorRefMappingService
 import org.apache.texera.amber.engine.architecture.controller.ControllerConfig
 import 
org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
 import org.apache.texera.amber.engine.architecture.rpc.controlreturns._
 import 
org.apache.texera.amber.engine.architecture.scheduling.RegionCoordinatorTestSupport._
+import org.apache.texera.amber.engine.architecture.scheduling.config.{
+  OperatorConfig,
+  OutputPortConfig,
+  ResourceConfig,
+  WorkerConfig
+}
 import 
org.apache.texera.amber.engine.architecture.worker.statistics.WorkerState
 import org.apache.texera.amber.engine.common.AmberRuntime
 import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
@@ -120,6 +136,85 @@ class RegionExecutionCoordinatorSpec
     assert(workerState(fixture) == WorkerState.TERMINATED)
   }
 
+  it should "surface the underlying cause when an output port schema is 
unavailable" in {
+    // Reproduces issue #3546: when schema inference for an output port fails 
(e.g. because a
+    // dataset used by the workflow has not been shared with the running 
user), the port's
+    // schema is stored as a `Left(cause)`. The coordinator must surface that 
real cause rather
+    // than discarding it behind a generic "Schema is missing" message.
+    val cause = new RuntimeException("User texera1 has no access to dataset 
'iris'")
+    val coordinator = coordinatorWithUnresolvedOutputSchema(cause)
+
+    val thrown = intercept[IllegalStateException] {
+      await(coordinator.syncStatusAndTransitionRegionExecutionPhase())
+    }
+    assert(thrown.getCause eq cause)
+    assert(thrown.getMessage.contains(cause.getMessage))
+  }
+
+  it should "fall back to the throwable's string form when the cause has no 
message" in {
+    // Some throwables (e.g. NullPointerException) carry a null message; the 
surfaced text must
+    // not read "...: null".
+    val cause = new NullPointerException()
+    assert(cause.getMessage == null)
+    val coordinator = coordinatorWithUnresolvedOutputSchema(cause)
+
+    val thrown = intercept[IllegalStateException] {
+      await(coordinator.syncStatusAndTransitionRegionExecutionPhase())
+    }
+    assert(thrown.getCause eq cause)
+    assert(thrown.getMessage.contains(cause.toString))
+    assert(!thrown.getMessage.endsWith("null"))
+  }
+
+  /**
+    * Builds a coordinator for a single-source region whose only output port 
has an unresolved
+    * schema (`Left(cause)`) and a configured output storage, so that the 
non-dependee phase
+    * reaches `createOutputPortStorageObjects` and attempts to read that 
schema.
+    */
+  private def coordinatorWithUnresolvedOutputSchema(
+      cause: Throwable
+  ): RegionExecutionCoordinator = {
+    val portId = PortIdentity(0)
+    val baseOp = 
createSourceOp("schema-missing-op").withOutputPorts(List(OutputPort(portId)))
+    val (outPort, links, _) = baseOp.outputPorts(portId)
+    val physicalOp =
+      baseOp.copy(outputPorts = baseOp.outputPorts.updated(portId, (outPort, 
links, Left(cause))))
+
+    val workerId = createWorkerId(physicalOp)
+    val globalPortId = GlobalPortIdentity(physicalOp.id, portId)
+    val storageBase =
+      VFSURIFactory.createPortBaseURI(DEFAULT_WORKFLOW_ID, 
DEFAULT_EXECUTION_ID, globalPortId)
+    val region = Region(
+      RegionIdentity(1),
+      physicalOps = Set(physicalOp),
+      physicalLinks = Set.empty,
+      resourceConfig = Some(
+        ResourceConfig(
+          operatorConfigs = Map(physicalOp.id -> 
OperatorConfig(List(WorkerConfig(workerId)))),
+          portConfigs = Map(globalPortId -> OutputPortConfig(storageBase))
+        )
+      )
+    )
+
+    val workflowExecution = WorkflowExecution()
+    seedReusableWorkerExecution(workflowExecution, seedRegionId = 0, 
physicalOp, workerId)
+    workflowExecution.initRegionExecution(region)
+
+    val rpcProbe = new ControllerRpcProbe(_ => None)
+    val controller = createControllerHarness()
+    registerLiveWorker(controller.actorRefService, workerId)
+
+    new RegionExecutionCoordinator(
+      region,
+      isRestart = false,
+      workflowExecution,
+      rpcProbe.asyncRPCClient,
+      ControllerConfig(None, None, None, None),
+      controller.actorService,
+      controller.actorRefService
+    )
+  }
+
   private case class SingleRegionFixture(
       coordinator: RegionExecutionCoordinator,
       rpcProbe: ControllerRpcProbe,

Reply via email to