This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 34b004d012 fix(amber): surface writer-thread failure as FatalError
instead of silent hang (#4683)
34b004d012 is described below
commit 34b004d012cc6a87fc2611a666aa1376d159c462
Author: Yicong Huang <[email protected]>
AuthorDate: Tue May 5 23:54:12 2026 -0700
fix(amber): surface writer-thread failure as FatalError instead of silent
hang (#4683)
### What changes were proposed in this PR?
When `OutputPortResultWriterThread.run()` throws (e.g. iceberg
commit-retry budget exhausted), the writer thread dies silently and the
worker still reports `portCompleted` to the controller. The user sees a
1-minute completion timeout with no signal pointing at iceberg.
Capture the failure on the writer thread, re-throw it from
`OutputManager.closeOutputStorageWriterIfNeeded`, and let the existing
DP-thread → worker-actor → controller-supervisor path surface it as a
`FatalError` to the client.
### Any related issues, documentation, discussions?
Closes #4682.
### How was this PR tested?
`OutputPortResultWriterThreadSpec` (6 tests) covers clean run, putOne
failure (close() still runs), close() failure, both-fail (close()
suppressed on putOne), and
`OutputManager.closeOutputStorageWriterIfNeeded` re-throw + no-op cases.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.7, 1M context)
---------
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../messaginglayer/OutputManager.scala | 6 +
.../managers/OutputPortResultWriterThread.scala | 38 +++++-
.../OutputPortResultWriterThreadSpec.scala | 151 +++++++++++++++++++++
3 files changed, 188 insertions(+), 7 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
index 4ab3d18056..affbd786f9 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
@@ -235,6 +235,11 @@ class OutputManager(
/**
* Singal the port storage writer to flush the remaining buffer and wait
for commits to finish so that
* the output port is properly completed. If the output port does not need
storage, no action will be done.
+ *
+ * If the writer thread captured a failure (e.g., iceberg commit retries
+ * exhausted), re-throw it here so the DP thread surfaces a FatalError
+ * to the controller via pekko's supervisor strategy. Otherwise the worker
+ * would announce port completion as if the result was durably written.
*/
def closeOutputStorageWriterIfNeeded(outputPortId: PortIdentity): Unit = {
this.outputPortResultWriterThreads.get(outputPortId) match {
@@ -243,6 +248,7 @@ class OutputManager(
writerThread.queue.put(Right(PortStorageWriterTerminateSignal))
// Blocking call
writerThread.join()
+ writerThread.getFailure.foreach(throw _)
case None =>
}
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala
index 28e5d2af66..4223d920da 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala
@@ -24,6 +24,7 @@ import
org.apache.texera.amber.core.storage.model.BufferedItemWriter
import org.apache.texera.amber.core.tuple.Tuple
import java.util.concurrent.LinkedBlockingQueue
+import scala.util.control.NonFatal
sealed trait TerminateSignal
case object PortStorageWriterTerminateSignal extends TerminateSignal
@@ -35,15 +36,38 @@ class OutputPortResultWriterThread(
val queue: LinkedBlockingQueue[Either[Tuple, TerminateSignal]] =
Queues.newLinkedBlockingQueue[Either[Tuple, TerminateSignal]]()
+ // Captured failure from put-one or close() so the worker DP thread can
+ // re-throw and let the controller's pekko supervisor surface a FatalError
+ // to the client. Without this, the writer thread dies silently and the
+ // worker keeps reporting normal port completion to the controller while
+ // results are missing or stale, leading to e2e timeouts that hide the
+ // real cause.
+ @volatile private var failure: Option[Throwable] = None
+ def getFailure: Option[Throwable] = failure
+
override def run(): Unit = {
- var internalStop = false
- while (!internalStop) {
- val queueContent = queue.take()
- queueContent match {
- case Left(tuple) => bufferedItemWriter.putOne(tuple)
- case Right(_) => internalStop = true
+ try {
+ var internalStop = false
+ while (!internalStop) {
+ queue.take() match {
+ case Left(tuple) => bufferedItemWriter.putOne(tuple)
+ case Right(_) => internalStop = true
+ }
+ }
+ } catch {
+ case NonFatal(e) => failure = Some(e)
+ } finally {
+ // close() runs even when the loop threw, so a putOne failure does
+ // not leak the underlying writer's file handles. If both legs fail,
+ // attach close()'s exception as suppressed on the original.
+ try bufferedItemWriter.close()
+ catch {
+ case NonFatal(e) =>
+ failure match {
+ case Some(orig) => orig.addSuppressed(e)
+ case None => failure = Some(e)
+ }
}
}
- bufferedItemWriter.close()
}
}
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThreadSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThreadSpec.scala
new file mode 100644
index 0000000000..31d8c41611
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThreadSpec.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.worker.managers
+
+import org.apache.texera.amber.core.storage.model.BufferedItemWriter
+import org.apache.texera.amber.core.tuple.Tuple
+import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
+import org.apache.texera.amber.core.workflow.PortIdentity
+import org.apache.texera.amber.engine.architecture.messaginglayer.{
+ NetworkOutputGateway,
+ OutputManager
+}
+import org.apache.texera.amber.engine.common.ambermessage.WorkflowFIFOMessage
+import org.scalatest.flatspec.AnyFlatSpec
+
+import scala.collection.mutable
+
+class OutputPortResultWriterThreadSpec extends AnyFlatSpec {
+
+ private class StubWriter(
+ onPutOne: () => Unit = () => (),
+ onClose: () => Unit = () => ()
+ ) extends BufferedItemWriter[Tuple] {
+ val bufferSize: Int = 1024
+ var closeCalled = false
+ def open(): Unit = ()
+ def putOne(item: Tuple): Unit = onPutOne()
+ def removeOne(item: Tuple): Unit = ()
+ def close(): Unit = {
+ closeCalled = true
+ onClose()
+ }
+ }
+
+ private def throwing(msg: String): () => Unit = () => throw new
RuntimeException(msg)
+
+ "OutputPortResultWriterThread" should "leave getFailure empty on a clean
run" in {
+ val writer = new StubWriter()
+ val thread = new OutputPortResultWriterThread(writer)
+ thread.start()
+ thread.queue.put(Right(PortStorageWriterTerminateSignal))
+ thread.join()
+ assert(thread.getFailure.isEmpty)
+ assert(writer.closeCalled)
+ }
+
+ it should "capture a close() exception in getFailure so the worker can
re-throw" in {
+ val writer = new StubWriter(onClose = throwing("test close failure"))
+ val thread = new OutputPortResultWriterThread(writer)
+ thread.start()
+ thread.queue.put(Right(PortStorageWriterTerminateSignal))
+ thread.join()
+ assert(thread.getFailure.exists(_.getMessage.contains("test close
failure")))
+ assert(writer.closeCalled)
+ }
+
+ it should "capture a putOne exception and still call close()" in {
+ val writer = new StubWriter(onPutOne = throwing("test putOne failure"))
+ val thread = new OutputPortResultWriterThread(writer)
+ thread.start()
+ thread.queue.put(Left(null.asInstanceOf[Tuple]))
+ thread.queue.put(Right(PortStorageWriterTerminateSignal))
+ thread.join()
+ assert(thread.getFailure.exists(_.getMessage.contains("test putOne
failure")))
+ // The finally clause must run close() even after putOne threw, or
+ // the underlying writer leaks file handles.
+ assert(writer.closeCalled)
+ }
+
+ it should "preserve both errors when putOne and close() fail in the same
run" in {
+ val writer = new StubWriter(
+ onPutOne = throwing("test putOne failure"),
+ onClose = throwing("test close failure")
+ )
+ val thread = new OutputPortResultWriterThread(writer)
+ thread.start()
+ thread.queue.put(Left(null.asInstanceOf[Tuple]))
+ thread.queue.put(Right(PortStorageWriterTerminateSignal))
+ thread.join()
+ val captured = thread.getFailure.getOrElse(fail("expected putOne failure"))
+ assert(captured.getMessage.contains("test putOne failure"))
+ assert(
+ captured.getSuppressed.exists(_.getMessage.contains("test close
failure")),
+ "close() failure should be attached as suppressed on the original putOne
failure"
+ )
+ }
+
+ // Reach into OutputManager's private outputPortResultWriterThreads map to
+ // install a writer thread whose close() has already failed. This pins the
+ // contract that closeOutputStorageWriterIfNeeded re-throws the captured
+ // failure, which is the bridge from the writer thread to the DP thread →
+ // worker actor → controller supervisor → FatalError to client.
+ private def installWriterThread(
+ manager: OutputManager,
+ portId: PortIdentity,
+ thread: OutputPortResultWriterThread
+ ): Unit = {
+ val field = classOf[OutputManager]
+ .getDeclaredField("outputPortResultWriterThreads")
+ field.setAccessible(true)
+ field
+ .get(manager)
+ .asInstanceOf[mutable.HashMap[PortIdentity,
OutputPortResultWriterThread]]
+ .put(portId, thread)
+ }
+
+ "OutputManager.closeOutputStorageWriterIfNeeded" should
+ "re-throw the writer thread's captured failure" in {
+ val identifier = ActorVirtualIdentity("test-worker")
+ val outputManager = new OutputManager(
+ identifier,
+ new NetworkOutputGateway(identifier, (_: WorkflowFIFOMessage) => ())
+ )
+ val portId = PortIdentity()
+ val failingWriter = new StubWriter(onClose = throwing("test close
failure"))
+ val failingThread = new OutputPortResultWriterThread(failingWriter)
+ failingThread.start()
+ installWriterThread(outputManager, portId, failingThread)
+ val ex = intercept[RuntimeException] {
+ outputManager.closeOutputStorageWriterIfNeeded(portId)
+ }
+ assert(ex.getMessage.contains("test close failure"))
+ }
+
+ it should "be a no-op when the port has no writer thread" in {
+ val identifier = ActorVirtualIdentity("test-worker")
+ val outputManager = new OutputManager(
+ identifier,
+ new NetworkOutputGateway(identifier, (_: WorkflowFIFOMessage) => ())
+ )
+ // No installWriterThread call — the port has never had a writer.
+ outputManager.closeOutputStorageWriterIfNeeded(PortIdentity())
+ }
+}