This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 20c506d76c99c2e6c3f30a039acd0366d3448c87 Author: Panagiotis Garefalakis <pgarefala...@confluent.io> AuthorDate: Wed Mar 27 22:23:48 2024 -0700 [FLINK-34922] Adds ITCase for GlobalFailureOnRestart Add an ITCase where a global failure is triggered while the scheduler is restarting, and asserts that this failure is handled such that can be retrieved via the REST API. --- .../test/scheduling/AdaptiveSchedulerITCase.java | 197 +++++++++++++++++++-- 1 file changed, 183 insertions(+), 14 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java index a4124dfe08c..d15f3ae7ef4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java @@ -18,11 +18,14 @@ package org.apache.flink.test.scheduling; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.time.Time; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.Configuration; @@ -30,9 +33,20 @@ import org.apache.flink.configuration.HeartbeatManagerOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphBuilder; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders; import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory; +import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory.RootExceptionInfo; import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters; import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -48,8 +62,10 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; +import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -62,11 +78,13 @@ import javax.annotation.Nullable; import java.io.File; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; import static org.apache.flink.util.ExceptionUtils.assertThrowable; import static org.hamcrest.CoreMatchers.containsString; @@ -266,25 +284,78 @@ public class AdaptiveSchedulerITCase extends TestLogger { final JobClient jobClient = env.executeAsync(); CommonTestUtils.waitUntilCondition( () -> { - final RestClusterClient<?> restClusterClient = - MINI_CLUSTER_WITH_CLIENT_RESOURCE.getRestClusterClient(); - final JobExceptionsMessageParameters params = - new JobExceptionsMessageParameters(); - params.jobPathParameter.resolve(jobClient.getJobID()); - final CompletableFuture<JobExceptionsInfoWithHistory> exceptionsFuture = - restClusterClient.sendRequest( - JobExceptionsHeaders.getInstance(), - params, - EmptyRequestBody.getInstance()); - final JobExceptionsInfoWithHistory jobExceptionsInfoWithHistory = - exceptionsFuture.get(); - return jobExceptionsInfoWithHistory.getExceptionHistory().getEntries().size() - > 0; + final List<RootExceptionInfo> exceptions = + getJobExceptions( + jobClient.getJobID(), MINI_CLUSTER_WITH_CLIENT_RESOURCE) + .get() + .getExceptionHistory() + .getEntries(); + return !exceptions.isEmpty(); }); jobClient.cancel().get(); CommonTestUtils.waitForJobStatus(jobClient, Collections.singletonList(JobStatus.CANCELED)); } + @Test + public void testGlobalFailureOnRestart() throws Exception { + final MiniCluster miniCluster = MINI_CLUSTER_WITH_CLIENT_RESOURCE.getMiniCluster(); + + final JobVertexID jobVertexId = new JobVertexID(); + final JobVertex jobVertex = new JobVertex("jobVertex", jobVertexId); + jobVertex.setInvokableClass(FailingInvokable.class); + jobVertex.addOperatorCoordinator( + new SerializedValue<>( + new FailingCoordinatorProvider(OperatorID.fromJobVertexID(jobVertexId)))); + jobVertex.setParallelism(1); + + final ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.hours(1))); + + final JobGraph jobGraph = + JobGraphBuilder.newStreamingJobGraphBuilder() + .addJobVertices(Collections.singletonList(jobVertex)) + .setExecutionConfig(executionConfig) + .build(); + miniCluster.submitJob(jobGraph).join(); + + // We rely on waiting in restarting state (see the restart strategy above) + CommonTestUtils.waitUntilCondition( + () -> miniCluster.getJobStatus(jobGraph.getJobID()).join() == JobStatus.RESTARTING); + FailingCoordinatorProvider.JOB_RESTARTING.countDown(); + + assertThatFuture(getJobExceptions(jobGraph.getJobID(), MINI_CLUSTER_WITH_CLIENT_RESOURCE)) + .eventuallySucceeds(); + + miniCluster.cancelJob(jobGraph.getJobID()); + CommonTestUtils.waitUntilCondition( + () -> miniCluster.getJobStatus(jobGraph.getJobID()).join() == JobStatus.CANCELED); + + final JobExceptionsInfoWithHistory jobExceptions = + getJobExceptions(jobGraph.getJobID(), MINI_CLUSTER_WITH_CLIENT_RESOURCE).get(); + + // there should be exactly 1 root exception in the history from the failing vertex, + // as the global coordinator failure should be treated as a concurrent exception + Assertions.assertThat(jobExceptions.getExceptionHistory().getEntries()) + .hasSize(1) + .allSatisfy( + rootExceptionInfo -> + Assertions.assertThat(rootExceptionInfo.getStacktrace()) + .contains(FailingInvokable.localExceptionMsg) + .doesNotContain( + FailingCoordinatorProvider.globalExceptionMsg)) + .allSatisfy( + rootExceptionInfo -> + Assertions.assertThat(rootExceptionInfo.getConcurrentExceptions()) + .anySatisfy( + exceptionInfo -> + Assertions.assertThat( + exceptionInfo + .getStacktrace()) + .contains( + FailingCoordinatorProvider + .globalExceptionMsg))); + } + private boolean isDirectoryEmpty(File directory) { File[] files = directory.listFiles(); if (files.length > 0) { @@ -306,6 +377,104 @@ public class AdaptiveSchedulerITCase extends TestLogger { return env; } + private static CompletableFuture<JobExceptionsInfoWithHistory> getJobExceptions( + JobID jobId, MiniClusterWithClientResource minClusterRes) throws Exception { + final RestClusterClient<?> restClusterClient = minClusterRes.getRestClusterClient(); + final JobExceptionsMessageParameters params = new JobExceptionsMessageParameters(); + params.jobPathParameter.resolve(jobId); + return restClusterClient.sendRequest( + JobExceptionsHeaders.getInstance(), params, EmptyRequestBody.getInstance()); + } + + /** Simple invokable which fails immediately after being invoked. */ + public static class FailingInvokable extends AbstractInvokable { + private static final String localExceptionMsg = "Local exception."; + + public FailingInvokable(Environment environment) { + super(environment); + } + + @Override + public void invoke() throws Exception { + throw new Exception(localExceptionMsg); + } + } + + private static class FailingCoordinatorProvider implements OperatorCoordinator.Provider { + + private static final CountDownLatch JOB_RESTARTING = new CountDownLatch(1); + + private final OperatorID operatorId; + private static final String globalExceptionMsg = "Global exception."; + + FailingCoordinatorProvider(OperatorID operatorId) { + this.operatorId = operatorId; + } + + @Override + public OperatorID getOperatorId() { + return operatorId; + } + + @Override + public OperatorCoordinator create(OperatorCoordinator.Context context) { + return new OperatorCoordinator() { + + @Nullable private Thread thread; + + @Override + public void start() { + thread = + new Thread( + () -> { + try { + JOB_RESTARTING.await(); + context.failJob(new Exception(globalExceptionMsg)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + thread.setName(AdaptiveSchedulerITCase.class + "_failing-coordinator"); + thread.setDaemon(true); + thread.start(); + } + + @Override + public void close() throws Exception { + if (thread != null) { + thread.interrupt(); + thread.join(); + } + } + + @Override + public void handleEventFromOperator( + int subtask, int attemptNumber, OperatorEvent event) {} + + @Override + public void checkpointCoordinator( + long checkpointId, CompletableFuture<byte[]> resultFuture) {} + + @Override + public void notifyCheckpointComplete(long checkpointId) {} + + @Override + public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) {} + + @Override + public void subtaskReset(int subtask, long checkpointId) {} + + @Override + public void executionAttemptFailed( + int subtask, int attemptNumber, @Nullable Throwable reason) {} + + @Override + public void executionAttemptReady( + int subtask, int attemptNumber, SubtaskGateway gateway) {} + }; + } + } + private static final class DummySource extends RichParallelSourceFunction<Integer> implements CheckpointedFunction, CheckpointListener { private final StopWithSavepointTestBehavior behavior;