metaswirl commented on a change in pull request #18689: URL: https://github.com/apache/flink/pull/18689#discussion_r805072225
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java ########## @@ -969,10 +979,203 @@ public void testHowToHandleFailureUnrecoverableFailure() throws Exception { new AdaptiveSchedulerBuilder(createJobGraph(), mainThreadExecutor).build(); assertThat( - scheduler - .howToHandleFailure(new SuppressRestartsException(new Exception("test"))) - .canRestart(), - is(false)); + scheduler + .howToHandleFailure( + new SuppressRestartsException(new Exception("test"))) + .canRestart()) + .isFalse(); + } + + static class RunFailedJobListener implements JobStatusListener { + OneShotLatch jobRunning; + OneShotLatch jobFailed; + + public RunFailedJobListener() { + this.jobRunning = new OneShotLatch(); + this.jobFailed = new OneShotLatch(); + } + + @Override + public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp) { + if (newJobStatus == JobStatus.RUNNING) { + jobRunning.trigger(); + } + if (newJobStatus == JobStatus.FAILED) { + jobFailed.trigger(); + } + } + + public void waitForRunning() throws InterruptedException { + jobRunning.await(); + } + + public void waitForFailed() throws InterruptedException { + jobFailed.await(); + } + } + + private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests( + int numAvailableSlots, + BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable> testLogic) + throws Exception { + final JobGraph jobGraph = createJobGraph(); + RunFailedJobListener listener = new RunFailedJobListener(); + List<ExecutionAttemptID> cancelledTasks = new ArrayList<>(); + + final DefaultDeclarativeSlotPool declarativeSlotPool = + createDeclarativeSlotPool(jobGraph.getJobID()); + + final Configuration configuration = new Configuration(); + configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L)); + + final AdaptiveScheduler scheduler = + new AdaptiveSchedulerBuilder(jobGraph, singleThreadMainThreadExecutor) + .setJobMasterConfiguration(configuration) + .setDeclarativeSlotPool(declarativeSlotPool) + .setJobStatusListener(listener) + .build(); + + final SubmissionBufferingTaskManagerGateway taskManagerGateway = + new SubmissionBufferingTaskManagerGateway(numAvailableSlots); + taskManagerGateway.setCancelConsumer(cancelledTasks::add); + + singleThreadMainThreadExecutor.execute( + () -> { + scheduler.startScheduling(); + offerSlots( + declarativeSlotPool, + createSlotOffersForResourceRequirements( + ResourceCounter.withResource( + ResourceProfile.UNKNOWN, numAvailableSlots)), + taskManagerGateway); + }); + listener.waitForRunning(); + + final Iterable<ArchivedExecutionVertex> executionVertices = + scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices(); + final List<ExecutionAttemptID> attemptIds = + IterableUtils.toStream(executionVertices) + .map(ArchivedExecutionVertex::getCurrentExecutionAttempt) + .map(ArchivedExecution::getAttemptId) + .collect(Collectors.toList()); + Runnable runnable = testLogic.apply(scheduler, attemptIds); + CompletableFuture<Void> runTestLogicFuture = + CompletableFuture.runAsync(runnable, singleThreadMainThreadExecutor); + runTestLogicFuture.get(); + + Consumer<ExecutionAttemptID> canceller = + attemptId -> + scheduler.updateTaskExecutionState( + new TaskExecutionStateTransition( + new TaskExecutionState( + attemptId, ExecutionState.CANCELED, null))); + CompletableFuture<Void> cancelFuture = + CompletableFuture.runAsync( + () -> cancelledTasks.forEach(canceller), singleThreadMainThreadExecutor); + cancelFuture.get(); + listener.waitForFailed(); + + return scheduler.requestJob().getExceptionHistory(); + } + + @Test + public void testExceptionHistoryWithGlobalFailure() throws Exception { + final Exception expectedException = new Exception("Expected Global Exception"); + BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable> testLogic = + (scheduler, attemptIds) -> { + final ExecutionAttemptID attemptId = attemptIds.remove(0); + + return () -> { + scheduler.handleGlobalFailure(expectedException); + scheduler.updateTaskExecutionState( + new TaskExecutionStateTransition( + new TaskExecutionState( + attemptId, ExecutionState.CANCELED, null))); + }; + }; + + final Iterable<RootExceptionHistoryEntry> actualExceptionHistory = + runExceptionHistoryTests(1, testLogic); + + assertThat(actualExceptionHistory).hasSize(1); + + final RootExceptionHistoryEntry failure = actualExceptionHistory.iterator().next(); + assertThat(failure.getTaskManagerLocation()).isNull(); + assertThat(failure.getFailingTaskName()).isNull(); + + assertThat(failure.getException().deserializeError(classLoader)) + .isEqualTo(expectedException); + } + + @Test + public void testExceptionHistoryWithTaskFailure() throws Exception { + final Exception expectedException = new Exception("Expected Local Exception"); + final int numAvailableSlots = 4; + BiFunction<AdaptiveScheduler, List<ExecutionAttemptID>, Runnable> testLogic = + (scheduler, attemptIds) -> { + final ExecutionAttemptID attemptId = attemptIds.get(1); + + return () -> + scheduler.updateTaskExecutionState( + new TaskExecutionStateTransition( + new TaskExecutionState( + attemptId, + ExecutionState.FAILED, + expectedException))); + }; + + final Iterable<RootExceptionHistoryEntry> actualExceptionHistory = + runExceptionHistoryTests(numAvailableSlots, testLogic); + + assertThat(actualExceptionHistory).hasSize(1); + + final RootExceptionHistoryEntry failure = actualExceptionHistory.iterator().next(); + + assertThat(failure.getException().deserializeError(classLoader)) + .isEqualTo(expectedException); + } + + @Test + public void testExceptionHistoryWithTaskConcurrentFailure() throws Exception { Review comment: This test fails. But, the reasons run deep. Please let me know, what you think we should do. **Explanation** First part: The first task failure 1. This task is currently is in the execution state `DEPLOYING` 2. It's `Execution` is marked as failed [here](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L1122) 3. The `AdaptiveScheduler` transitions to `FAILING` 4. The `DefaultExecutionGraph` calls `failJob` 5. All executions, but the first move to `CANCELING ` Part two: The second task fails 1. It's already in `CANCELING` execution state 2. It never reaches the part where the failure info is stored in the `Execution` [here](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L1122) 3. When we create a `ExceptionHistoryEntry`, we fail [here](https://github.com/metaswirl/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java#L59) Solution proposals 1. We store failure info on an `Execution`, even if it's already canceling (Basically, move the line up that stores the `ErrorInfo` on the `Execution` in `processFail`. 2. We only store a single local failure in the `RootExeptionHistoryEntry` I propose we go with option two for now. I am not sure what other effects changing the Execution class has. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org