(flink) 01/02: [FLINK-34922][rest] Support concurrent global failure
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git commit fcb581f0039f9704b6eaf15a2fabaa4e05d79048 Author: Chesnay Schepler AuthorDate: Wed Mar 27 09:33:08 2024 +0100 [FLINK-34922][rest] Support concurrent global failure --- .../rest/handler/job/JobExceptionsHandler.java | 12 + .../rest/handler/job/JobExceptionsHandlerTest.java | 53 +- 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java index 5ece82a2671..84140c8c007 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java @@ -248,6 +248,18 @@ public class JobExceptionsHandler private static JobExceptionsInfoWithHistory.ExceptionInfo createExceptionInfo( ExceptionHistoryEntry exceptionHistoryEntry) { + +if (exceptionHistoryEntry.isGlobal()) { +return new JobExceptionsInfoWithHistory.ExceptionInfo( + exceptionHistoryEntry.getException().getOriginalErrorClassName(), +exceptionHistoryEntry.getExceptionAsString(), +exceptionHistoryEntry.getTimestamp(), +exceptionHistoryEntry.getFailureLabels(), +null, +null, +null); +} + assertLocalExceptionInfo(exceptionHistoryEntry); return new JobExceptionsInfoWithHistory.ExceptionInfo( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java index efce7903686..c7699c6f951 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java @@ -55,6 +55,7 @@ import org.apache.flink.util.SerializedThrowable; import org.apache.flink.util.TestLogger; import org.apache.flink.util.concurrent.Executors; +import org.assertj.core.api.Assertions; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeDiagnosingMatcher; @@ -64,6 +65,7 @@ import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -73,6 +75,7 @@ import java.util.concurrent.ExecutionException; import java.util.function.Function; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; +import static org.assertj.core.api.HamcrestCondition.matching; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.collection.IsEmptyCollection.empty; @@ -214,6 +217,47 @@ public class JobExceptionsHandlerTest extends TestLogger { assertFalse(response.getExceptionHistory().isTruncated()); } +@Test +public void testWithExceptionHistoryAndConcurrentGlobalFailure() +throws HandlerRequestException, ExecutionException, InterruptedException { +final ExceptionHistoryEntry otherFailure = +ExceptionHistoryEntry.createGlobal( +new RuntimeException("exception #1"), + CompletableFuture.completedFuture(Collections.emptyMap())); +final RootExceptionHistoryEntry rootCause = +fromGlobalFailure( +new RuntimeException("exception #0"), +System.currentTimeMillis(), +Collections.singleton(otherFailure)); + +final ExecutionGraphInfo executionGraphInfo = createExecutionGraphInfo(rootCause); +final HandlerRequest request = +createRequest(executionGraphInfo.getJobId(), 10); +final JobExceptionsInfoWithHistory response = +testInstance.handleRequest(request, executionGraphInfo); + +Assertions.assertThat(response.getExceptionHistory().getEntries()) +.hasSize(1) +.satisfies( +matching( +contains( +historyContainsGlobalFailure( +rootCause.getException(), +rootCause.getTimestamp(), +matchesFailure( +
(flink) 01/02: [FLINK-34922][rest] Support concurrent global failure
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git commit faa880c703cadba4521fc8ef885a242ded4b2ac7 Author: Chesnay Schepler AuthorDate: Wed Mar 27 09:33:08 2024 +0100 [FLINK-34922][rest] Support concurrent global failure --- .../rest/handler/job/JobExceptionsHandler.java | 13 ++ .../rest/handler/job/JobExceptionsHandlerTest.java | 51 +- 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java index 55c7875e85c..6d5f49d55b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java @@ -249,6 +249,19 @@ public class JobExceptionsHandler private static JobExceptionsInfoWithHistory.ExceptionInfo createExceptionInfo( ExceptionHistoryEntry exceptionHistoryEntry) { + +if (exceptionHistoryEntry.isGlobal()) { +return new JobExceptionsInfoWithHistory.ExceptionInfo( + exceptionHistoryEntry.getException().getOriginalErrorClassName(), +exceptionHistoryEntry.getExceptionAsString(), +exceptionHistoryEntry.getTimestamp(), +exceptionHistoryEntry.getFailureLabels(), +null, +null, +null, +null); +} + assertLocalExceptionInfo(exceptionHistoryEntry); return new JobExceptionsInfoWithHistory.ExceptionInfo( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java index f5354e5ce90..761881d1624 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java @@ -63,6 +63,7 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -215,6 +216,47 @@ class JobExceptionsHandlerTest { assertThat(response.getExceptionHistory().isTruncated()).isFalse(); } +@Test +void testWithExceptionHistoryAndConcurrentGlobalFailure() +throws HandlerRequestException, ExecutionException, InterruptedException { +final ExceptionHistoryEntry otherFailure = +ExceptionHistoryEntry.createGlobal( +new RuntimeException("exception #1"), + CompletableFuture.completedFuture(Collections.emptyMap())); +final RootExceptionHistoryEntry rootCause = +fromGlobalFailure( +new RuntimeException("exception #0"), +System.currentTimeMillis(), +Collections.singleton(otherFailure)); + +final ExecutionGraphInfo executionGraphInfo = createExecutionGraphInfo(rootCause); +final HandlerRequest request = +createRequest(executionGraphInfo.getJobId(), 10); +final JobExceptionsInfoWithHistory response = +testInstance.handleRequest(request, executionGraphInfo); + +assertThat(response.getExceptionHistory().getEntries()) +.hasSize(1) +.satisfies( +matching( +contains( +historyContainsGlobalFailure( +rootCause.getException(), +rootCause.getTimestamp(), +matchesFailure( + otherFailure.getException(), + otherFailure.getTimestamp(), + otherFailure.getFailureLabelsFuture(), + otherFailure.getFailingTaskName(), + JobExceptionsHandler.toString( +otherFailure + .getTaskManagerLocation()), + JobExceptionsHandler.toTaskManagerId( +otherFailure +
(flink) 01/02: [FLINK-34922][rest] Support concurrent global failure
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit dc957bfdc3aa6a8e3bce603cfc68c5c553c72220 Author: Chesnay Schepler AuthorDate: Wed Mar 27 09:33:08 2024 +0100 [FLINK-34922][rest] Support concurrent global failure --- .../rest/handler/job/JobExceptionsHandler.java | 13 ++ .../rest/handler/job/JobExceptionsHandlerTest.java | 51 +- 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java index 55c7875e85c..6d5f49d55b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java @@ -249,6 +249,19 @@ public class JobExceptionsHandler private static JobExceptionsInfoWithHistory.ExceptionInfo createExceptionInfo( ExceptionHistoryEntry exceptionHistoryEntry) { + +if (exceptionHistoryEntry.isGlobal()) { +return new JobExceptionsInfoWithHistory.ExceptionInfo( + exceptionHistoryEntry.getException().getOriginalErrorClassName(), +exceptionHistoryEntry.getExceptionAsString(), +exceptionHistoryEntry.getTimestamp(), +exceptionHistoryEntry.getFailureLabels(), +null, +null, +null, +null); +} + assertLocalExceptionInfo(exceptionHistoryEntry); return new JobExceptionsInfoWithHistory.ExceptionInfo( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java index a377ec83bb0..c40bdb2c8e8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java @@ -64,6 +64,7 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -216,6 +217,47 @@ class JobExceptionsHandlerTest { assertThat(response.getExceptionHistory().isTruncated()).isFalse(); } +@Test +void testWithExceptionHistoryAndConcurrentGlobalFailure() +throws HandlerRequestException, ExecutionException, InterruptedException { +final ExceptionHistoryEntry otherFailure = +ExceptionHistoryEntry.createGlobal( +new RuntimeException("exception #1"), + CompletableFuture.completedFuture(Collections.emptyMap())); +final RootExceptionHistoryEntry rootCause = +fromGlobalFailure( +new RuntimeException("exception #0"), +System.currentTimeMillis(), +Collections.singleton(otherFailure)); + +final ExecutionGraphInfo executionGraphInfo = createExecutionGraphInfo(rootCause); +final HandlerRequest request = +createRequest(executionGraphInfo.getJobId(), 10); +final JobExceptionsInfoWithHistory response = +testInstance.handleRequest(request, executionGraphInfo); + +assertThat(response.getExceptionHistory().getEntries()) +.hasSize(1) +.satisfies( +matching( +contains( +historyContainsGlobalFailure( +rootCause.getException(), +rootCause.getTimestamp(), +matchesFailure( + otherFailure.getException(), + otherFailure.getTimestamp(), + otherFailure.getFailureLabelsFuture(), + otherFailure.getFailingTaskName(), + JobExceptionsHandler.toString( +otherFailure + .getTaskManagerLocation()), + JobExceptionsHandler.toTaskManagerId( +otherFailure +