XComp commented on a change in pull request #14798: URL: https://github.com/apache/flink/pull/14798#discussion_r570012364
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ########## @@ -870,6 +874,78 @@ public void allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(0)); } + @Test + public void testExceptionHistoryWithRestartableFailure() { + final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); + final JobID jobId = jobGraph.getJobID(); + + final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + + // initiate restartable failure + final ExecutionAttemptID restartableAttemptId = + Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices()) + .getCurrentExecutionAttempt() + .getAttemptId(); + final RuntimeException restartableException = new RuntimeException("restartable exception"); + Range<Long> updateStateTriggeringRestartTimeframe = + initiateFailure(scheduler, jobId, restartableAttemptId, restartableException); + + taskRestartExecutor.triggerNonPeriodicScheduledTask(); + + // initiate job failure + testRestartBackoffTimeStrategy.setCanRestart(false); + + final ExecutionAttemptID failingAttemptId = + Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices()) + .getCurrentExecutionAttempt() + .getAttemptId(); + final RuntimeException failingException = new RuntimeException("failing exception"); + Range<Long> updateStateTriggeringJobFailureTimeframe = + initiateFailure(scheduler, jobId, failingAttemptId, failingException); + + List<ErrorInfo> actualExceptionHistory = scheduler.getExceptionHistory(); + assertThat(actualExceptionHistory.size(), is(2)); + + // assert restarted attempt + ErrorInfo restartableFailure = actualExceptionHistory.get(0); + assertThat( + restartableFailure + .getException() + .deserializeError(ClassLoader.getSystemClassLoader()), + is(restartableException)); + assertThat( + restartableFailure.getTimestamp(), + greaterThanOrEqualTo(updateStateTriggeringRestartTimeframe.lowerEndpoint())); + assertThat( + restartableFailure.getTimestamp(), + lessThanOrEqualTo(updateStateTriggeringRestartTimeframe.upperEndpoint())); + + // assert job failure attempt + ErrorInfo globalFailure = actualExceptionHistory.get(1); + Throwable actualException = + globalFailure.getException().deserializeError(ClassLoader.getSystemClassLoader()); + assertThat(actualException, org.hamcrest.core.IsInstanceOf.instanceOf(JobException.class)); + assertThat(actualException.getCause(), is(failingException)); + assertThat( + globalFailure.getTimestamp(), + greaterThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.lowerEndpoint())); + assertThat( + globalFailure.getTimestamp(), + lessThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.upperEndpoint())); + } + + private static Range<Long> initiateFailure( + DefaultScheduler scheduler, + JobID jobId, + ExecutionAttemptID executionAttemptID, + Throwable exception) { + long start = System.currentTimeMillis(); Review comment: Thanks for clarification. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org