(flink) 01/02: [FLINK-34922][rest] Support concurrent global failure

2024-03-28 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fcb581f0039f9704b6eaf15a2fabaa4e05d79048
Author: Chesnay Schepler 
AuthorDate: Wed Mar 27 09:33:08 2024 +0100

[FLINK-34922][rest] Support concurrent global failure
---
 .../rest/handler/job/JobExceptionsHandler.java | 12 +
 .../rest/handler/job/JobExceptionsHandlerTest.java | 53 +-
 2 files changed, 64 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index 5ece82a2671..84140c8c007 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -248,6 +248,18 @@ public class JobExceptionsHandler
 
 private static JobExceptionsInfoWithHistory.ExceptionInfo 
createExceptionInfo(
 ExceptionHistoryEntry exceptionHistoryEntry) {
+
+if (exceptionHistoryEntry.isGlobal()) {
+return new JobExceptionsInfoWithHistory.ExceptionInfo(
+
exceptionHistoryEntry.getException().getOriginalErrorClassName(),
+exceptionHistoryEntry.getExceptionAsString(),
+exceptionHistoryEntry.getTimestamp(),
+exceptionHistoryEntry.getFailureLabels(),
+null,
+null,
+null);
+}
+
 assertLocalExceptionInfo(exceptionHistoryEntry);
 
 return new JobExceptionsInfoWithHistory.ExceptionInfo(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
index efce7903686..c7699c6f951 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
@@ -55,6 +55,7 @@ import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.Executors;
 
+import org.assertj.core.api.Assertions;
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
 import org.hamcrest.TypeSafeDiagnosingMatcher;
@@ -64,6 +65,7 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -73,6 +75,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
+import static org.assertj.core.api.HamcrestCondition.matching;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.collection.IsEmptyCollection.empty;
@@ -214,6 +217,47 @@ public class JobExceptionsHandlerTest extends TestLogger {
 assertFalse(response.getExceptionHistory().isTruncated());
 }
 
+@Test
+public void testWithExceptionHistoryAndConcurrentGlobalFailure()
+throws HandlerRequestException, ExecutionException, 
InterruptedException {
+final ExceptionHistoryEntry otherFailure =
+ExceptionHistoryEntry.createGlobal(
+new RuntimeException("exception #1"),
+
CompletableFuture.completedFuture(Collections.emptyMap()));
+final RootExceptionHistoryEntry rootCause =
+fromGlobalFailure(
+new RuntimeException("exception #0"),
+System.currentTimeMillis(),
+Collections.singleton(otherFailure));
+
+final ExecutionGraphInfo executionGraphInfo = 
createExecutionGraphInfo(rootCause);
+final HandlerRequest request =
+createRequest(executionGraphInfo.getJobId(), 10);
+final JobExceptionsInfoWithHistory response =
+testInstance.handleRequest(request, executionGraphInfo);
+
+Assertions.assertThat(response.getExceptionHistory().getEntries())
+.hasSize(1)
+.satisfies(
+matching(
+contains(
+historyContainsGlobalFailure(
+rootCause.getException(),
+rootCause.getTimestamp(),
+matchesFailure(
+

(flink) 01/02: [FLINK-34922][rest] Support concurrent global failure

2024-03-28 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git

commit faa880c703cadba4521fc8ef885a242ded4b2ac7
Author: Chesnay Schepler 
AuthorDate: Wed Mar 27 09:33:08 2024 +0100

[FLINK-34922][rest] Support concurrent global failure
---
 .../rest/handler/job/JobExceptionsHandler.java | 13 ++
 .../rest/handler/job/JobExceptionsHandlerTest.java | 51 +-
 2 files changed, 63 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index 55c7875e85c..6d5f49d55b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -249,6 +249,19 @@ public class JobExceptionsHandler
 
 private static JobExceptionsInfoWithHistory.ExceptionInfo 
createExceptionInfo(
 ExceptionHistoryEntry exceptionHistoryEntry) {
+
+if (exceptionHistoryEntry.isGlobal()) {
+return new JobExceptionsInfoWithHistory.ExceptionInfo(
+
exceptionHistoryEntry.getException().getOriginalErrorClassName(),
+exceptionHistoryEntry.getExceptionAsString(),
+exceptionHistoryEntry.getTimestamp(),
+exceptionHistoryEntry.getFailureLabels(),
+null,
+null,
+null,
+null);
+}
+
 assertLocalExceptionInfo(exceptionHistoryEntry);
 
 return new JobExceptionsInfoWithHistory.ExceptionInfo(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
index f5354e5ce90..761881d1624 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
@@ -63,6 +63,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -215,6 +216,47 @@ class JobExceptionsHandlerTest {
 assertThat(response.getExceptionHistory().isTruncated()).isFalse();
 }
 
+@Test
+void testWithExceptionHistoryAndConcurrentGlobalFailure()
+throws HandlerRequestException, ExecutionException, 
InterruptedException {
+final ExceptionHistoryEntry otherFailure =
+ExceptionHistoryEntry.createGlobal(
+new RuntimeException("exception #1"),
+
CompletableFuture.completedFuture(Collections.emptyMap()));
+final RootExceptionHistoryEntry rootCause =
+fromGlobalFailure(
+new RuntimeException("exception #0"),
+System.currentTimeMillis(),
+Collections.singleton(otherFailure));
+
+final ExecutionGraphInfo executionGraphInfo = 
createExecutionGraphInfo(rootCause);
+final HandlerRequest request =
+createRequest(executionGraphInfo.getJobId(), 10);
+final JobExceptionsInfoWithHistory response =
+testInstance.handleRequest(request, executionGraphInfo);
+
+assertThat(response.getExceptionHistory().getEntries())
+.hasSize(1)
+.satisfies(
+matching(
+contains(
+historyContainsGlobalFailure(
+rootCause.getException(),
+rootCause.getTimestamp(),
+matchesFailure(
+
otherFailure.getException(),
+
otherFailure.getTimestamp(),
+
otherFailure.getFailureLabelsFuture(),
+
otherFailure.getFailingTaskName(),
+
JobExceptionsHandler.toString(
+otherFailure
+
.getTaskManagerLocation()),
+
JobExceptionsHandler.toTaskManagerId(
+otherFailure
+

(flink) 01/02: [FLINK-34922][rest] Support concurrent global failure

2024-03-28 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dc957bfdc3aa6a8e3bce603cfc68c5c553c72220
Author: Chesnay Schepler 
AuthorDate: Wed Mar 27 09:33:08 2024 +0100

[FLINK-34922][rest] Support concurrent global failure
---
 .../rest/handler/job/JobExceptionsHandler.java | 13 ++
 .../rest/handler/job/JobExceptionsHandlerTest.java | 51 +-
 2 files changed, 63 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index 55c7875e85c..6d5f49d55b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -249,6 +249,19 @@ public class JobExceptionsHandler
 
 private static JobExceptionsInfoWithHistory.ExceptionInfo 
createExceptionInfo(
 ExceptionHistoryEntry exceptionHistoryEntry) {
+
+if (exceptionHistoryEntry.isGlobal()) {
+return new JobExceptionsInfoWithHistory.ExceptionInfo(
+
exceptionHistoryEntry.getException().getOriginalErrorClassName(),
+exceptionHistoryEntry.getExceptionAsString(),
+exceptionHistoryEntry.getTimestamp(),
+exceptionHistoryEntry.getFailureLabels(),
+null,
+null,
+null,
+null);
+}
+
 assertLocalExceptionInfo(exceptionHistoryEntry);
 
 return new JobExceptionsInfoWithHistory.ExceptionInfo(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
index a377ec83bb0..c40bdb2c8e8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
@@ -64,6 +64,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -216,6 +217,47 @@ class JobExceptionsHandlerTest {
 assertThat(response.getExceptionHistory().isTruncated()).isFalse();
 }
 
+@Test
+void testWithExceptionHistoryAndConcurrentGlobalFailure()
+throws HandlerRequestException, ExecutionException, 
InterruptedException {
+final ExceptionHistoryEntry otherFailure =
+ExceptionHistoryEntry.createGlobal(
+new RuntimeException("exception #1"),
+
CompletableFuture.completedFuture(Collections.emptyMap()));
+final RootExceptionHistoryEntry rootCause =
+fromGlobalFailure(
+new RuntimeException("exception #0"),
+System.currentTimeMillis(),
+Collections.singleton(otherFailure));
+
+final ExecutionGraphInfo executionGraphInfo = 
createExecutionGraphInfo(rootCause);
+final HandlerRequest request =
+createRequest(executionGraphInfo.getJobId(), 10);
+final JobExceptionsInfoWithHistory response =
+testInstance.handleRequest(request, executionGraphInfo);
+
+assertThat(response.getExceptionHistory().getEntries())
+.hasSize(1)
+.satisfies(
+matching(
+contains(
+historyContainsGlobalFailure(
+rootCause.getException(),
+rootCause.getTimestamp(),
+matchesFailure(
+
otherFailure.getException(),
+
otherFailure.getTimestamp(),
+
otherFailure.getFailureLabelsFuture(),
+
otherFailure.getFailingTaskName(),
+
JobExceptionsHandler.toString(
+otherFailure
+
.getTaskManagerLocation()),
+
JobExceptionsHandler.toTaskManagerId(
+otherFailure
+