This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f4c945cb9ca882ae485c2e58c74825938f154119
Author: Panagiotis Garefalakis <pgarefala...@confluent.io>
AuthorDate: Wed Mar 27 22:23:48 2024 -0700

    [FLINK-34922] Adds ITCase for GlobalFailureOnRestart
    
    Add an ITCase where a global failure is triggered while the scheduler is 
restarting, and asserts that
    this failure is handled such that can be retrieved via the REST API.
---
 .../test/scheduling/AdaptiveSchedulerITCase.java   | 197 +++++++++++++++++++--
 1 file changed, 183 insertions(+), 14 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
index 2836ff82371..f6c31658f7d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.test.scheduling;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
@@ -31,9 +34,20 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.execution.CheckpointingMode;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
 import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
+import 
org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory.RootExceptionInfo;
 import 
org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters;
 import 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -48,8 +62,10 @@ import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -62,11 +78,13 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
 import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
 import static org.apache.flink.util.ExceptionUtils.assertThrowable;
 import static org.hamcrest.CoreMatchers.containsString;
@@ -270,25 +288,78 @@ public class AdaptiveSchedulerITCase extends TestLogger {
         final JobClient jobClient = env.executeAsync();
         CommonTestUtils.waitUntilCondition(
                 () -> {
-                    final RestClusterClient<?> restClusterClient =
-                            
MINI_CLUSTER_WITH_CLIENT_RESOURCE.getRestClusterClient();
-                    final JobExceptionsMessageParameters params =
-                            new JobExceptionsMessageParameters();
-                    params.jobPathParameter.resolve(jobClient.getJobID());
-                    final CompletableFuture<JobExceptionsInfoWithHistory> 
exceptionsFuture =
-                            restClusterClient.sendRequest(
-                                    JobExceptionsHeaders.getInstance(),
-                                    params,
-                                    EmptyRequestBody.getInstance());
-                    final JobExceptionsInfoWithHistory 
jobExceptionsInfoWithHistory =
-                            exceptionsFuture.get();
-                    return 
jobExceptionsInfoWithHistory.getExceptionHistory().getEntries().size()
-                            > 0;
+                    final List<RootExceptionInfo> exceptions =
+                            getJobExceptions(
+                                            jobClient.getJobID(), 
MINI_CLUSTER_WITH_CLIENT_RESOURCE)
+                                    .get()
+                                    .getExceptionHistory()
+                                    .getEntries();
+                    return !exceptions.isEmpty();
                 });
         jobClient.cancel().get();
         CommonTestUtils.waitForJobStatus(jobClient, 
Collections.singletonList(JobStatus.CANCELED));
     }
 
+    @Test
+    public void testGlobalFailureOnRestart() throws Exception {
+        final MiniCluster miniCluster = 
MINI_CLUSTER_WITH_CLIENT_RESOURCE.getMiniCluster();
+
+        final JobVertexID jobVertexId = new JobVertexID();
+        final JobVertex jobVertex = new JobVertex("jobVertex", jobVertexId);
+        jobVertex.setInvokableClass(FailingInvokable.class);
+        jobVertex.addOperatorCoordinator(
+                new SerializedValue<>(
+                        new 
FailingCoordinatorProvider(OperatorID.fromJobVertexID(jobVertexId))));
+        jobVertex.setParallelism(1);
+
+        final ExecutionConfig executionConfig = new ExecutionConfig();
+        
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 
Time.hours(1)));
+
+        final JobGraph jobGraph =
+                JobGraphBuilder.newStreamingJobGraphBuilder()
+                        .addJobVertices(Collections.singletonList(jobVertex))
+                        .setExecutionConfig(executionConfig)
+                        .build();
+        miniCluster.submitJob(jobGraph).join();
+
+        // We rely on waiting in restarting state (see the restart strategy 
above)
+        CommonTestUtils.waitUntilCondition(
+                () -> miniCluster.getJobStatus(jobGraph.getJobID()).join() == 
JobStatus.RESTARTING);
+        FailingCoordinatorProvider.JOB_RESTARTING.countDown();
+
+        assertThatFuture(getJobExceptions(jobGraph.getJobID(), 
MINI_CLUSTER_WITH_CLIENT_RESOURCE))
+                .eventuallySucceeds();
+
+        miniCluster.cancelJob(jobGraph.getJobID());
+        CommonTestUtils.waitUntilCondition(
+                () -> miniCluster.getJobStatus(jobGraph.getJobID()).join() == 
JobStatus.CANCELED);
+
+        final JobExceptionsInfoWithHistory jobExceptions =
+                getJobExceptions(jobGraph.getJobID(), 
MINI_CLUSTER_WITH_CLIENT_RESOURCE).get();
+
+        // there should be exactly 1 root exception in the history from the 
failing vertex,
+        // as the global coordinator failure should be treated as a concurrent 
exception
+        Assertions.assertThat(jobExceptions.getExceptionHistory().getEntries())
+                .hasSize(1)
+                .allSatisfy(
+                        rootExceptionInfo ->
+                                
Assertions.assertThat(rootExceptionInfo.getStacktrace())
+                                        
.contains(FailingInvokable.localExceptionMsg)
+                                        .doesNotContain(
+                                                
FailingCoordinatorProvider.globalExceptionMsg))
+                .allSatisfy(
+                        rootExceptionInfo ->
+                                
Assertions.assertThat(rootExceptionInfo.getConcurrentExceptions())
+                                        .anySatisfy(
+                                                exceptionInfo ->
+                                                        Assertions.assertThat(
+                                                                        
exceptionInfo
+                                                                               
 .getStacktrace())
+                                                                .contains(
+                                                                        
FailingCoordinatorProvider
+                                                                               
 .globalExceptionMsg)));
+    }
+
     private boolean isDirectoryEmpty(File directory) {
         File[] files = directory.listFiles();
         if (files.length > 0) {
@@ -312,6 +383,104 @@ public class AdaptiveSchedulerITCase extends TestLogger {
         return env;
     }
 
+    private static CompletableFuture<JobExceptionsInfoWithHistory> 
getJobExceptions(
+            JobID jobId, MiniClusterWithClientResource minClusterRes) throws 
Exception {
+        final RestClusterClient<?> restClusterClient = 
minClusterRes.getRestClusterClient();
+        final JobExceptionsMessageParameters params = new 
JobExceptionsMessageParameters();
+        params.jobPathParameter.resolve(jobId);
+        return restClusterClient.sendRequest(
+                JobExceptionsHeaders.getInstance(), params, 
EmptyRequestBody.getInstance());
+    }
+
+    /** Simple invokable which fails immediately after being invoked. */
+    public static class FailingInvokable extends AbstractInvokable {
+        private static final String localExceptionMsg = "Local exception.";
+
+        public FailingInvokable(Environment environment) {
+            super(environment);
+        }
+
+        @Override
+        public void invoke() throws Exception {
+            throw new Exception(localExceptionMsg);
+        }
+    }
+
+    private static class FailingCoordinatorProvider implements 
OperatorCoordinator.Provider {
+
+        private static final CountDownLatch JOB_RESTARTING = new 
CountDownLatch(1);
+
+        private final OperatorID operatorId;
+        private static final String globalExceptionMsg = "Global exception.";
+
+        FailingCoordinatorProvider(OperatorID operatorId) {
+            this.operatorId = operatorId;
+        }
+
+        @Override
+        public OperatorID getOperatorId() {
+            return operatorId;
+        }
+
+        @Override
+        public OperatorCoordinator create(OperatorCoordinator.Context context) 
{
+            return new OperatorCoordinator() {
+
+                @Nullable private Thread thread;
+
+                @Override
+                public void start() {
+                    thread =
+                            new Thread(
+                                    () -> {
+                                        try {
+                                            JOB_RESTARTING.await();
+                                            context.failJob(new 
Exception(globalExceptionMsg));
+                                        } catch (InterruptedException e) {
+                                            Thread.currentThread().interrupt();
+                                        }
+                                    });
+                    thread.setName(AdaptiveSchedulerITCase.class + 
"_failing-coordinator");
+                    thread.setDaemon(true);
+                    thread.start();
+                }
+
+                @Override
+                public void close() throws Exception {
+                    if (thread != null) {
+                        thread.interrupt();
+                        thread.join();
+                    }
+                }
+
+                @Override
+                public void handleEventFromOperator(
+                        int subtask, int attemptNumber, OperatorEvent event) {}
+
+                @Override
+                public void checkpointCoordinator(
+                        long checkpointId, CompletableFuture<byte[]> 
resultFuture) {}
+
+                @Override
+                public void notifyCheckpointComplete(long checkpointId) {}
+
+                @Override
+                public void resetToCheckpoint(long checkpointId, @Nullable 
byte[] checkpointData) {}
+
+                @Override
+                public void subtaskReset(int subtask, long checkpointId) {}
+
+                @Override
+                public void executionAttemptFailed(
+                        int subtask, int attemptNumber, @Nullable Throwable 
reason) {}
+
+                @Override
+                public void executionAttemptReady(
+                        int subtask, int attemptNumber, SubtaskGateway 
gateway) {}
+            };
+        }
+    }
+
     private static final class DummySource extends 
RichParallelSourceFunction<Integer>
             implements CheckpointedFunction, CheckpointListener {
         private final StopWithSavepointTestBehavior behavior;

Reply via email to