(flink) 02/02: [FLINK-34922] Adds ITCase for GlobalFailureOnRestart
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git commit 20c506d76c99c2e6c3f30a039acd0366d3448c87 Author: Panagiotis Garefalakis AuthorDate: Wed Mar 27 22:23:48 2024 -0700 [FLINK-34922] Adds ITCase for GlobalFailureOnRestart Add an ITCase where a global failure is triggered while the scheduler is restarting, and asserts that this failure is handled such that can be retrieved via the REST API. --- .../test/scheduling/AdaptiveSchedulerITCase.java | 197 +++-- 1 file changed, 183 insertions(+), 14 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java index a4124dfe08c..d15f3ae7ef4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java @@ -18,11 +18,14 @@ package org.apache.flink.test.scheduling; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.time.Time; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.Configuration; @@ -30,9 +33,20 @@ import org.apache.flink.configuration.HeartbeatManagerOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphBuilder; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders; import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory; +import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory.RootExceptionInfo; import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters; import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -48,8 +62,10 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; +import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -62,11 +78,13 @@ import javax.annotation.Nullable; import java.io.File; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; import static org.apache.flink.util.ExceptionUtils.assertThrowable; import static org.hamcrest.CoreMatchers.containsString; @@ -266,25 +284,78 @@ public class AdaptiveSchedulerITCase extends TestLogger { final JobClient jobClient = env.executeAsync(); CommonTestUtils.waitUntilCondition( () -> { -final RestClusterClient restClusterClient = - MINI_CLUSTER_WITH_CLIENT_RESOURCE.getRestClusterClient(); -final JobExceptionsMessageParameters params = -new JobExceptionsMessageParameters(); -params.jobPathParameter.resolve(jobClient.getJobID()); -final CompletableFuture exceptionsFuture = -restClusterClient.sendRequest( -
(flink) 02/02: [FLINK-34922] Adds ITCase for GlobalFailureOnRestart
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git commit b54edc886ce5a533bafe74fa3629657b6266cad5 Author: Panagiotis Garefalakis AuthorDate: Wed Mar 27 22:23:48 2024 -0700 [FLINK-34922] Adds ITCase for GlobalFailureOnRestart Add an ITCase where a global failure is triggered while the scheduler is restarting, and asserts that this failure is handled such that can be retrieved via the REST API. --- .../test/scheduling/AdaptiveSchedulerITCase.java | 197 +++-- 1 file changed, 183 insertions(+), 14 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java index 0cbfdd950bd..e63b2891039 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java @@ -18,11 +18,14 @@ package org.apache.flink.test.scheduling; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.time.Time; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.Configuration; @@ -30,9 +33,20 @@ import org.apache.flink.configuration.HeartbeatManagerOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphBuilder; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders; import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory; +import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory.RootExceptionInfo; import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters; import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -48,8 +62,10 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; +import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -62,11 +78,13 @@ import javax.annotation.Nullable; import java.io.File; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; import static org.apache.flink.util.ExceptionUtils.assertThrowable; import static org.hamcrest.CoreMatchers.containsString; @@ -270,25 +288,78 @@ public class AdaptiveSchedulerITCase extends TestLogger { final JobClient jobClient = env.executeAsync(); CommonTestUtils.waitUntilCondition( () -> { -final RestClusterClient restClusterClient = - MINI_CLUSTER_WITH_CLIENT_RESOURCE.getRestClusterClient(); -final JobExceptionsMessageParameters params = -new JobExceptionsMessageParameters(); -params.jobPathParameter.resolve(jobClient.getJobID()); -final CompletableFuture exceptionsFuture = -restClusterClient.sendRequest( -
(flink) 02/02: [FLINK-34922] Adds ITCase for GlobalFailureOnRestart
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit f4c945cb9ca882ae485c2e58c74825938f154119 Author: Panagiotis Garefalakis AuthorDate: Wed Mar 27 22:23:48 2024 -0700 [FLINK-34922] Adds ITCase for GlobalFailureOnRestart Add an ITCase where a global failure is triggered while the scheduler is restarting, and asserts that this failure is handled such that can be retrieved via the REST API. --- .../test/scheduling/AdaptiveSchedulerITCase.java | 197 +++-- 1 file changed, 183 insertions(+), 14 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java index 2836ff82371..f6c31658f7d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java @@ -18,11 +18,14 @@ package org.apache.flink.test.scheduling; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.time.Time; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.Configuration; @@ -31,9 +34,20 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphBuilder; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders; import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory; +import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory.RootExceptionInfo; import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters; import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -48,8 +62,10 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; +import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -62,11 +78,13 @@ import javax.annotation.Nullable; import java.io.File; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; import static org.apache.flink.util.ExceptionUtils.assertThrowable; import static org.hamcrest.CoreMatchers.containsString; @@ -270,25 +288,78 @@ public class AdaptiveSchedulerITCase extends TestLogger { final JobClient jobClient = env.executeAsync(); CommonTestUtils.waitUntilCondition( () -> { -final RestClusterClient restClusterClient = - MINI_CLUSTER_WITH_CLIENT_RESOURCE.getRestClusterClient(); -final JobExceptionsMessageParameters params = -new JobExceptionsMessageParameters(); -params.jobPathParameter.resolve(jobClient.getJobID()); -final CompletableFuture exceptionsFuture = -restClusterClient.sendRequest( -