This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch release/v1.2
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/release/v1.2 by this push:
new e0ee535060 fix(execution-service): surface init-time fatal errors to
the websocket (#5781)
e0ee535060 is described below
commit e0ee535060aa6ff00008987a30b8393150dc5583
Author: Xinyuan Lin <[email protected]>
AuthorDate: Wed Jun 24 02:57:30 2026 +0000
fix(execution-service): surface init-time fatal errors to the websocket
(#5781)
### What changes were proposed in this PR?
When workflow execution initialization fails, the error was recorded
into the execution metadata store but never pushed to the websocket, so
connected frontend clients saw nothing — particularly for failures
during `WorkflowExecutionService` construction, which happens *before*
the execution is published to subscribers.
`WorkflowService.initExecutionService`'s catch arm now, after
`errorHandler(e)` records the fatal error, pushes a `WorkflowErrorEvent`
(carrying the recorded fatal errors) to `errorSubject` — the
workflow-level channel that `connect()` subscribers listen on — so
init-time failures surface in the UI.
| init failure | before | after |
|---|---|---|
| during `WorkflowExecutionService` construction (pre-publish) | logged
+ stored, invisible to the UI | `WorkflowErrorEvent` delivered to the
frontend |
| during `executeWorkflow()` | recorded; UI delivery depended on
subscription timing | `WorkflowErrorEvent` delivered to the frontend |
The push is extracted into a small `reportFatalErrorsToSubscribers`
method so it can be unit-tested without a database (the init path itself
is DB-bound).
### Any related issues, documentation, discussions?
Resolves #5782. Discovered while splitting #5700 (loop operators) into
smaller PRs; this fix is independent of that feature and applies to
`main` on its own.
### How was this PR tested?
New `WorkflowServiceSpec` (TDD, red → green): pins that
`reportFatalErrorsToSubscribers` delivers a `WorkflowErrorEvent` to a
`connect()` subscriber carrying exactly the fatal errors recorded in the
execution state store (single error, and all errors when several are
present). `sbt "WorkflowExecutionService/testOnly *WorkflowServiceSpec"`
passes (2/2); scalafmt + scalafix clean.
### Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.8 in compliance with ASF.
(backported from commit 1c580e59eb69bc45205298606c3980c67a05803f)
---
.../texera/web/service/WorkflowService.scala | 35 ++++++++-
.../texera/web/service/WorkflowServiceSpec.scala | 85 ++++++++++++++++++++++
2 files changed, 118 insertions(+), 2 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
index c18b8b50e8..f6c67159ae 100644
--- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
+++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
@@ -50,7 +50,7 @@ import org.apache.texera.amber.error.ErrorUtils.{
}
import org.apache.texera.dao.jooq.generated.tables.pojos.User
import org.apache.texera.service.util.LargeBinaryManager
-import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent
+import org.apache.texera.web.model.websocket.event.{TexeraWebSocketEvent,
WorkflowErrorEvent}
import org.apache.texera.web.model.websocket.request.WorkflowExecuteRequest
import
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
import org.apache.texera.web.service.WorkflowService.mkWorkflowStateId
@@ -277,6 +277,14 @@ class WorkflowService(
}
}
}
+ // Once the execution is published via `executionService.onNext`, the
normal
+ // state-store path surfaces fatal errors to the UI: `errorHandler` writes
+ // them into `executionStateStore.metadataStore`, whose diff handler (set
up
+ // in the WorkflowExecutionService constructor) emits a WorkflowErrorEvent
+ // that `connectToExecution` forwards. Before that point, neither the
emitter
+ // nor a subscriber exists yet, so a failure in the constructor itself
would
+ // be recorded but never reach the frontend -- see the fallback in `catch`.
+ var executionPublished = false
try {
val execution = new WorkflowExecutionService(
controllerConf,
@@ -290,13 +298,36 @@ class WorkflowService(
)
lifeCycleManager.registerCleanUpOnStateChange(executionStateStore)
executionService.onNext(execution)
+ executionPublished = true
execution.executeWorkflow()
} catch {
- case e: Throwable => errorHandler(e)
+ case e: Throwable =>
+ errorHandler(e)
+ // If the execution was never published, no `connectToExecution`
+ // subscriber is bound to `executionStateStore`, so the state-store
path
+ // above cannot deliver the error. Push it directly in that pre-publish
+ // window only; once published, the state-store path already surfaces
it
+ // (pushing here too would double-emit).
+ if (!executionPublished) {
+ reportFatalErrorsToSubscribers(executionStateStore)
+ }
}
}
+ /**
+ * Push the fatal errors currently recorded in `stateStore` to connected
+ * websocket subscribers (via `errorSubject`).
+ *
+ * Fallback used only when execution initialization fails before the
execution
+ * is published (e.g. the WorkflowExecutionService constructor throws): in
that
+ * window the per-execution state store has no diff-handler emitter and no
+ * websocket subscriber, so the error -- already recorded by `errorHandler`
--
+ * would otherwise be logged but never reach the frontend.
+ */
+ private[service] def reportFatalErrorsToSubscribers(stateStore:
ExecutionStateStore): Unit =
+
errorSubject.onNext(WorkflowErrorEvent(stateStore.metadataStore.getState.fatalErrors))
+
def convertToJson(frontendVersion: String): String = {
val environmentVersionMap = Map(
"engine_version" -> Json.toJson(frontendVersion)
diff --git
a/amber/src/test/scala/org/apache/texera/web/service/WorkflowServiceSpec.scala
b/amber/src/test/scala/org/apache/texera/web/service/WorkflowServiceSpec.scala
new file mode 100644
index 0000000000..7c1d879c93
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/web/service/WorkflowServiceSpec.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.service
+
+import com.google.protobuf.timestamp.Timestamp
+import org.apache.texera.amber.core.virtualidentity.WorkflowIdentity
+import
org.apache.texera.amber.core.workflowruntimestate.FatalErrorType.EXECUTION_FAILURE
+import org.apache.texera.amber.core.workflowruntimestate.WorkflowFatalError
+import org.apache.texera.web.model.websocket.event.{TexeraWebSocketEvent,
WorkflowErrorEvent}
+import org.apache.texera.web.storage.ExecutionStateStore
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+import java.time.Instant
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Unit tests for `WorkflowService.reportFatalErrorsToSubscribers`, the seam
+ * that surfaces init-time fatal errors to the websocket. When execution
+ * initialization fails, the error is recorded in the metadata store; this
push
+ * is what makes it visible to connected clients instead of only logged.
+ */
+class WorkflowServiceSpec extends AnyFlatSpec with Matchers {
+
+ private def fatalError(message: String): WorkflowFatalError =
+ WorkflowFatalError(EXECUTION_FAILURE, Timestamp(Instant.now), message, "",
"", "")
+
+ /** A WorkflowService with a subscriber collecting every event it pushes. */
+ private def serviceWithCollector(): (WorkflowService,
ArrayBuffer[TexeraWebSocketEvent]) = {
+ val service = new WorkflowService(WorkflowIdentity(1), computingUnitId =
1, cleanUpTimeout = 30)
+ val events = ArrayBuffer.empty[TexeraWebSocketEvent]
+ service.connect(evt => events += evt)
+ (service, events)
+ }
+
+ private def errorEventsIn(events: ArrayBuffer[TexeraWebSocketEvent]):
Seq[WorkflowErrorEvent] =
+ events.collect { case e: WorkflowErrorEvent => e }.toSeq
+
+ "WorkflowService" should
+ "push a WorkflowErrorEvent carrying the store's fatal error to connected
subscribers" in {
+ val (service, events) = serviceWithCollector()
+ val store = new ExecutionStateStore()
+ val err = fatalError("boom during init")
+ store.metadataStore.updateState(_.addFatalErrors(err))
+
+ service.reportFatalErrorsToSubscribers(store)
+
+ val errorEvents = errorEventsIn(events)
+ errorEvents should have size 1
+ // Forwards exactly the store's fatal errors -- no more, no less.
+ errorEvents.head.fatalErrors should contain theSameElementsAs Seq(err)
+ }
+
+ it should "carry every fatal error currently recorded in the store" in {
+ val (service, events) = serviceWithCollector()
+ val store = new ExecutionStateStore()
+ val first = fatalError("first")
+ val second = fatalError("second")
+
store.metadataStore.updateState(_.addFatalErrors(first).addFatalErrors(second))
+
+ service.reportFatalErrorsToSubscribers(store)
+
+ val errorEvents = errorEventsIn(events)
+ errorEvents should have size 1
+ // Exactly the two recorded errors -- no extras.
+ errorEvents.head.fatalErrors should contain theSameElementsAs Seq(first,
second)
+ }
+}