This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 6433e713a0 fix(amber): surface real cause when output port schema is
unavailable (#5784)
6433e713a0 is described below
commit 6433e713a08606eb952581828e8f9c360a763013
Author: Tanishq Gandhi <[email protected]>
AuthorDate: Tue Jun 23 12:33:44 2026 -0700
fix(amber): surface real cause when output port schema is unavailable
(#5784)
### What changes were proposed in this PR?
When a workflow run fails because an output port's schema can't be
resolved, the engine threw a generic `IllegalStateException("Schema is
missing")`, discarding the actual cause. It typically happens when a
dataset used by the workflow has not been shared with the user running
it, but nothing in the message says so.
Root cause: in
`RegionExecutionCoordinator.createOutputPortStorageObjects`, the output
port schema is an `Either[Throwable, Schema]`, but it was unwrapped with
`Either.getOrElse(throw new IllegalStateException("Schema is
missing"))`. getOrElse is right-biased, so on a `Left(cause)` it
evaluates the default and throws away the real cause.
```
Before: Left(cause: "no access to dataset X") --getOrElse(throw)-->
"Schema is missing"
After: Left(cause) -->
IllegalStateException("Failed to resolve the output schema: " + cause.msg,
cause)
```
The fix matches on the `Either` and, on `Left`, throws an
`IllegalStateException` that keeps the original cause as the exception
cause
**Before / After** (the message surfaces in the workspace Result Panel →
Static Error frame):
| | Message |
|---|---|
| **Before** | Schema is missing |
| **After** | Failed to resolve the output schema: <real cause> (e.g. …:
User has no access to this dataset) |
### Any related issues, documentation, discussions?
Closes #3546
### How was this PR tested?
Added a regression test in `RegionExecutionCoordinatorSpec` that seeds
an output port with a Left(cause) schema and asserts the coordinator
throws an IllegalStateException whose getCause is the original cause and
whose message contains the underlying message. The existing
positive-path tests in the same spec continue to pass.
```
sbt "WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.architecture.scheduling.RegionExecutionCoordinatorSpec"
...
[info] Total number of tests run: 3
[info] Tests: succeeded 3, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.8)
---
.../scheduling/RegionExecutionCoordinator.scala | 16 +++-
.../RegionExecutionCoordinatorSpec.scala | 97 +++++++++++++++++++++-
2 files changed, 109 insertions(+), 4 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 4497d7c4ae..aba6eff759 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -572,10 +572,20 @@ class RegionExecutionCoordinator(
val portBaseURI = portConfig.storageURIBase
val resultURI = VFSURIFactory.resultURI(portBaseURI)
val stateURI = VFSURIFactory.stateURI(portBaseURI)
- val schemaOptional =
-
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
val schema =
- schemaOptional.getOrElse(throw new IllegalStateException("Schema is
missing"))
+
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3 match
{
+ case Right(resolvedSchema) => resolvedSchema
+ case Left(cause) =>
+ // The output port schema failed to resolve (e.g. a dataset the
workflow reads is not
+ // shared with the running user, making its file and inferred
schema unavailable).
+ // Surface the underlying cause instead of a generic "Schema is
missing" (issue #3546).
+ val reason = Option(cause.getMessage).getOrElse(cause.toString)
+ logger.error(s"Output schema unavailable for port
$outputPortId", cause)
+ throw new IllegalStateException(
+ s"Failed to resolve the output schema: $reason",
+ cause
+ )
+ }
// An output port whose storage accumulates across region re-executions
// (e.g. a LoopEnd port, whose output builds up over the iterations of
// its own loop) sets `reuseStorage`. When set, the port's existing
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
index 9e6cb227e5..2c663612c4 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
@@ -22,13 +22,29 @@ package
org.apache.texera.amber.engine.architecture.scheduling
import com.twitter.util.Future
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.testkit.TestKit
+import org.apache.texera.amber.core.storage.VFSURIFactory
import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity,
ChannelIdentity}
-import org.apache.texera.amber.core.workflow.PhysicalOp
+import org.apache.texera.amber.core.workflow.{
+ GlobalPortIdentity,
+ OutputPort,
+ PhysicalOp,
+ PortIdentity
+}
+import org.apache.texera.amber.core.workflow.WorkflowContext.{
+ DEFAULT_EXECUTION_ID,
+ DEFAULT_WORKFLOW_ID
+}
import
org.apache.texera.amber.engine.architecture.common.PekkoActorRefMappingService
import org.apache.texera.amber.engine.architecture.controller.ControllerConfig
import
org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
import org.apache.texera.amber.engine.architecture.rpc.controlreturns._
import
org.apache.texera.amber.engine.architecture.scheduling.RegionCoordinatorTestSupport._
+import org.apache.texera.amber.engine.architecture.scheduling.config.{
+ OperatorConfig,
+ OutputPortConfig,
+ ResourceConfig,
+ WorkerConfig
+}
import
org.apache.texera.amber.engine.architecture.worker.statistics.WorkerState
import org.apache.texera.amber.engine.common.AmberRuntime
import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
@@ -120,6 +136,85 @@ class RegionExecutionCoordinatorSpec
assert(workerState(fixture) == WorkerState.TERMINATED)
}
+ it should "surface the underlying cause when an output port schema is
unavailable" in {
+ // Reproduces issue #3546: when schema inference for an output port fails
(e.g. because a
+ // dataset used by the workflow has not been shared with the running
user), the port's
+ // schema is stored as a `Left(cause)`. The coordinator must surface that
real cause rather
+ // than discarding it behind a generic "Schema is missing" message.
+ val cause = new RuntimeException("User texera1 has no access to dataset
'iris'")
+ val coordinator = coordinatorWithUnresolvedOutputSchema(cause)
+
+ val thrown = intercept[IllegalStateException] {
+ await(coordinator.syncStatusAndTransitionRegionExecutionPhase())
+ }
+ assert(thrown.getCause eq cause)
+ assert(thrown.getMessage.contains(cause.getMessage))
+ }
+
+ it should "fall back to the throwable's string form when the cause has no
message" in {
+ // Some throwables (e.g. NullPointerException) carry a null message; the
surfaced text must
+ // not read "...: null".
+ val cause = new NullPointerException()
+ assert(cause.getMessage == null)
+ val coordinator = coordinatorWithUnresolvedOutputSchema(cause)
+
+ val thrown = intercept[IllegalStateException] {
+ await(coordinator.syncStatusAndTransitionRegionExecutionPhase())
+ }
+ assert(thrown.getCause eq cause)
+ assert(thrown.getMessage.contains(cause.toString))
+ assert(!thrown.getMessage.endsWith("null"))
+ }
+
+ /**
+ * Builds a coordinator for a single-source region whose only output port
has an unresolved
+ * schema (`Left(cause)`) and a configured output storage, so that the
non-dependee phase
+ * reaches `createOutputPortStorageObjects` and attempts to read that
schema.
+ */
+ private def coordinatorWithUnresolvedOutputSchema(
+ cause: Throwable
+ ): RegionExecutionCoordinator = {
+ val portId = PortIdentity(0)
+ val baseOp =
createSourceOp("schema-missing-op").withOutputPorts(List(OutputPort(portId)))
+ val (outPort, links, _) = baseOp.outputPorts(portId)
+ val physicalOp =
+ baseOp.copy(outputPorts = baseOp.outputPorts.updated(portId, (outPort,
links, Left(cause))))
+
+ val workerId = createWorkerId(physicalOp)
+ val globalPortId = GlobalPortIdentity(physicalOp.id, portId)
+ val storageBase =
+ VFSURIFactory.createPortBaseURI(DEFAULT_WORKFLOW_ID,
DEFAULT_EXECUTION_ID, globalPortId)
+ val region = Region(
+ RegionIdentity(1),
+ physicalOps = Set(physicalOp),
+ physicalLinks = Set.empty,
+ resourceConfig = Some(
+ ResourceConfig(
+ operatorConfigs = Map(physicalOp.id ->
OperatorConfig(List(WorkerConfig(workerId)))),
+ portConfigs = Map(globalPortId -> OutputPortConfig(storageBase))
+ )
+ )
+ )
+
+ val workflowExecution = WorkflowExecution()
+ seedReusableWorkerExecution(workflowExecution, seedRegionId = 0,
physicalOp, workerId)
+ workflowExecution.initRegionExecution(region)
+
+ val rpcProbe = new ControllerRpcProbe(_ => None)
+ val controller = createControllerHarness()
+ registerLiveWorker(controller.actorRefService, workerId)
+
+ new RegionExecutionCoordinator(
+ region,
+ isRestart = false,
+ workflowExecution,
+ rpcProbe.asyncRPCClient,
+ ControllerConfig(None, None, None, None),
+ controller.actorService,
+ controller.actorRefService
+ )
+ }
+
private case class SingleRegionFixture(
coordinator: RegionExecutionCoordinator,
rpcProbe: ControllerRpcProbe,