This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fcb581f0039f9704b6eaf15a2fabaa4e05d79048
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Wed Mar 27 09:33:08 2024 +0100

    [FLINK-34922][rest] Support concurrent global failure
---
 .../rest/handler/job/JobExceptionsHandler.java     | 12 +++++
 .../rest/handler/job/JobExceptionsHandlerTest.java | 53 +++++++++++++++++++++-
 2 files changed, 64 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index 5ece82a2671..84140c8c007 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -248,6 +248,18 @@ public class JobExceptionsHandler
 
     private static JobExceptionsInfoWithHistory.ExceptionInfo 
createExceptionInfo(
             ExceptionHistoryEntry exceptionHistoryEntry) {
+
+        if (exceptionHistoryEntry.isGlobal()) {
+            return new JobExceptionsInfoWithHistory.ExceptionInfo(
+                    
exceptionHistoryEntry.getException().getOriginalErrorClassName(),
+                    exceptionHistoryEntry.getExceptionAsString(),
+                    exceptionHistoryEntry.getTimestamp(),
+                    exceptionHistoryEntry.getFailureLabels(),
+                    null,
+                    null,
+                    null);
+        }
+
         assertLocalExceptionInfo(exceptionHistoryEntry);
 
         return new JobExceptionsInfoWithHistory.ExceptionInfo(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
index efce7903686..c7699c6f951 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
@@ -55,6 +55,7 @@ import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.Executors;
 
+import org.assertj.core.api.Assertions;
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
 import org.hamcrest.TypeSafeDiagnosingMatcher;
@@ -64,6 +65,7 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -73,6 +75,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
+import static org.assertj.core.api.HamcrestCondition.matching;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.collection.IsEmptyCollection.empty;
@@ -214,6 +217,47 @@ public class JobExceptionsHandlerTest extends TestLogger {
         assertFalse(response.getExceptionHistory().isTruncated());
     }
 
+    @Test
+    public void testWithExceptionHistoryAndConcurrentGlobalFailure()
+            throws HandlerRequestException, ExecutionException, 
InterruptedException {
+        final ExceptionHistoryEntry otherFailure =
+                ExceptionHistoryEntry.createGlobal(
+                        new RuntimeException("exception #1"),
+                        
CompletableFuture.completedFuture(Collections.emptyMap()));
+        final RootExceptionHistoryEntry rootCause =
+                fromGlobalFailure(
+                        new RuntimeException("exception #0"),
+                        System.currentTimeMillis(),
+                        Collections.singleton(otherFailure));
+
+        final ExecutionGraphInfo executionGraphInfo = 
createExecutionGraphInfo(rootCause);
+        final HandlerRequest<EmptyRequestBody> request =
+                createRequest(executionGraphInfo.getJobId(), 10);
+        final JobExceptionsInfoWithHistory response =
+                testInstance.handleRequest(request, executionGraphInfo);
+
+        Assertions.assertThat(response.getExceptionHistory().getEntries())
+                .hasSize(1)
+                .satisfies(
+                        matching(
+                                contains(
+                                        historyContainsGlobalFailure(
+                                                rootCause.getException(),
+                                                rootCause.getTimestamp(),
+                                                matchesFailure(
+                                                        
otherFailure.getException(),
+                                                        
otherFailure.getTimestamp(),
+                                                        
otherFailure.getFailureLabelsFuture(),
+                                                        
otherFailure.getFailingTaskName(),
+                                                        
JobExceptionsHandler.toString(
+                                                                otherFailure
+                                                                        
.getTaskManagerLocation()),
+                                                        
JobExceptionsHandler.toTaskManagerId(
+                                                                otherFailure
+                                                                        
.getTaskManagerLocation()))))));
+        
Assertions.assertThat(response.getExceptionHistory().isTruncated()).isFalse();
+    }
+
     @Test
     public void testWithExceptionHistoryWithMatchingFailureLabel()
             throws HandlerRequestException, ExecutionException, 
InterruptedException {
@@ -532,13 +576,20 @@ public class JobExceptionsHandlerTest extends TestLogger {
     }
 
     private static RootExceptionHistoryEntry fromGlobalFailure(Throwable 
cause, long timestamp) {
+        return fromGlobalFailure(cause, timestamp, Collections.emptySet());
+    }
+
+    private static RootExceptionHistoryEntry fromGlobalFailure(
+            Throwable cause,
+            long timestamp,
+            Collection<ExceptionHistoryEntry> concurrentExceptions) {
         return new RootExceptionHistoryEntry(
                 cause,
                 timestamp,
                 FailureEnricherUtils.EMPTY_FAILURE_LABELS,
                 null,
                 null,
-                Collections.emptySet());
+                concurrentExceptions);
     }
 
     // -------- factory methods for instantiating new Matchers --------

Reply via email to