Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5746#discussion_r177731188 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingFatalErrorHandler.java --- @@ -19,45 +19,81 @@ package org.apache.flink.runtime.util; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; /** * Testing fatal error handler which records the occurred exceptions during the execution of the * tests. Captured exceptions are thrown as a {@link TestingException}. */ public class TestingFatalErrorHandler implements FatalErrorHandler { private static final Logger LOG = LoggerFactory.getLogger(TestingFatalErrorHandler.class); - private final AtomicReference<Throwable> atomicThrowable; + private CompletableFuture<Throwable> errorFuture; public TestingFatalErrorHandler() { - atomicThrowable = new AtomicReference<>(null); + errorFuture = new CompletableFuture<>(); } - public void rethrowError() throws TestingException { - Throwable throwable = atomicThrowable.get(); + public synchronized void rethrowError() throws TestingException { + final Throwable throwable = getException(); if (throwable != null) { - throw new TestingException(throwable); + throw new TestingException(throwable); + } + } + + public synchronized boolean hasExceptionOccurred() { + return errorFuture.isDone(); + } + + @Nullable + public synchronized Throwable getException() { + if (errorFuture.isDone()) { + Throwable throwable = null; + + try { + throwable = errorFuture.get(); + } catch (InterruptedException ie) { + Thread.interrupted(); --- End diff -- Given that we want to keep the interrupted flag set for the caller, I think we have to call `Thread.interrupted`. Otherwise, other components which are called later will miss the interruption. Actually it would be better to call `ExceptionUtils.checkInterrupted` which is a utility for this behaviour.
---