This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git

commit faa880c703cadba4521fc8ef885a242ded4b2ac7
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Wed Mar 27 09:33:08 2024 +0100

    [FLINK-34922][rest] Support concurrent global failure
---
 .../rest/handler/job/JobExceptionsHandler.java     | 13 ++++++
 .../rest/handler/job/JobExceptionsHandlerTest.java | 51 +++++++++++++++++++++-
 2 files changed, 63 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index 55c7875e85c..6d5f49d55b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -249,6 +249,19 @@ public class JobExceptionsHandler
 
     private static JobExceptionsInfoWithHistory.ExceptionInfo 
createExceptionInfo(
             ExceptionHistoryEntry exceptionHistoryEntry) {
+
+        if (exceptionHistoryEntry.isGlobal()) {
+            return new JobExceptionsInfoWithHistory.ExceptionInfo(
+                    
exceptionHistoryEntry.getException().getOriginalErrorClassName(),
+                    exceptionHistoryEntry.getExceptionAsString(),
+                    exceptionHistoryEntry.getTimestamp(),
+                    exceptionHistoryEntry.getFailureLabels(),
+                    null,
+                    null,
+                    null,
+                    null);
+        }
+
         assertLocalExceptionInfo(exceptionHistoryEntry);
 
         return new JobExceptionsInfoWithHistory.ExceptionInfo(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
index f5354e5ce90..761881d1624 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
@@ -63,6 +63,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -215,6 +216,47 @@ class JobExceptionsHandlerTest {
         assertThat(response.getExceptionHistory().isTruncated()).isFalse();
     }
 
+    @Test
+    void testWithExceptionHistoryAndConcurrentGlobalFailure()
+            throws HandlerRequestException, ExecutionException, 
InterruptedException {
+        final ExceptionHistoryEntry otherFailure =
+                ExceptionHistoryEntry.createGlobal(
+                        new RuntimeException("exception #1"),
+                        
CompletableFuture.completedFuture(Collections.emptyMap()));
+        final RootExceptionHistoryEntry rootCause =
+                fromGlobalFailure(
+                        new RuntimeException("exception #0"),
+                        System.currentTimeMillis(),
+                        Collections.singleton(otherFailure));
+
+        final ExecutionGraphInfo executionGraphInfo = 
createExecutionGraphInfo(rootCause);
+        final HandlerRequest<EmptyRequestBody> request =
+                createRequest(executionGraphInfo.getJobId(), 10);
+        final JobExceptionsInfoWithHistory response =
+                testInstance.handleRequest(request, executionGraphInfo);
+
+        assertThat(response.getExceptionHistory().getEntries())
+                .hasSize(1)
+                .satisfies(
+                        matching(
+                                contains(
+                                        historyContainsGlobalFailure(
+                                                rootCause.getException(),
+                                                rootCause.getTimestamp(),
+                                                matchesFailure(
+                                                        
otherFailure.getException(),
+                                                        
otherFailure.getTimestamp(),
+                                                        
otherFailure.getFailureLabelsFuture(),
+                                                        
otherFailure.getFailingTaskName(),
+                                                        
JobExceptionsHandler.toString(
+                                                                otherFailure
+                                                                        
.getTaskManagerLocation()),
+                                                        
JobExceptionsHandler.toTaskManagerId(
+                                                                otherFailure
+                                                                        
.getTaskManagerLocation()))))));
+        assertThat(response.getExceptionHistory().isTruncated()).isFalse();
+    }
+
     @Test
     void testWithExceptionHistoryWithMatchingFailureLabel()
             throws HandlerRequestException, ExecutionException, 
InterruptedException {
@@ -540,13 +582,20 @@ class JobExceptionsHandlerTest {
     }
 
     private static RootExceptionHistoryEntry fromGlobalFailure(Throwable 
cause, long timestamp) {
+        return fromGlobalFailure(cause, timestamp, Collections.emptySet());
+    }
+
+    private static RootExceptionHistoryEntry fromGlobalFailure(
+            Throwable cause,
+            long timestamp,
+            Collection<ExceptionHistoryEntry> concurrentExceptions) {
         return new RootExceptionHistoryEntry(
                 cause,
                 timestamp,
                 FailureEnricherUtils.EMPTY_FAILURE_LABELS,
                 null,
                 null,
-                Collections.emptySet());
+                concurrentExceptions);
     }
 
     // -------- factory methods for instantiating new Matchers --------

Reply via email to