XComp commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r570012364



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -870,6 +874,78 @@ public void 
allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception
         assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), 
hasSize(0));
     }
 
+    @Test
+    public void testExceptionHistoryWithRestartableFailure() {
+        final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+        final JobID jobId = jobGraph.getJobID();
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        // initiate restartable failure
+        final ExecutionAttemptID restartableAttemptId =
+                
Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final RuntimeException restartableException = new 
RuntimeException("restartable exception");
+        Range<Long> updateStateTriggeringRestartTimeframe =
+                initiateFailure(scheduler, jobId, restartableAttemptId, 
restartableException);
+
+        taskRestartExecutor.triggerNonPeriodicScheduledTask();
+
+        // initiate job failure
+        testRestartBackoffTimeStrategy.setCanRestart(false);
+
+        final ExecutionAttemptID failingAttemptId =
+                
Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final RuntimeException failingException = new 
RuntimeException("failing exception");
+        Range<Long> updateStateTriggeringJobFailureTimeframe =
+                initiateFailure(scheduler, jobId, failingAttemptId, 
failingException);
+
+        List<ErrorInfo> actualExceptionHistory = 
scheduler.getExceptionHistory();
+        assertThat(actualExceptionHistory.size(), is(2));
+
+        // assert restarted attempt
+        ErrorInfo restartableFailure = actualExceptionHistory.get(0);
+        assertThat(
+                restartableFailure
+                        .getException()
+                        .deserializeError(ClassLoader.getSystemClassLoader()),
+                is(restartableException));
+        assertThat(
+                restartableFailure.getTimestamp(),
+                
greaterThanOrEqualTo(updateStateTriggeringRestartTimeframe.lowerEndpoint()));
+        assertThat(
+                restartableFailure.getTimestamp(),
+                
lessThanOrEqualTo(updateStateTriggeringRestartTimeframe.upperEndpoint()));
+
+        // assert job failure attempt
+        ErrorInfo globalFailure = actualExceptionHistory.get(1);
+        Throwable actualException =
+                
globalFailure.getException().deserializeError(ClassLoader.getSystemClassLoader());
+        assertThat(actualException, 
org.hamcrest.core.IsInstanceOf.instanceOf(JobException.class));
+        assertThat(actualException.getCause(), is(failingException));
+        assertThat(
+                globalFailure.getTimestamp(),
+                
greaterThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.lowerEndpoint()));
+        assertThat(
+                globalFailure.getTimestamp(),
+                
lessThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.upperEndpoint()));
+    }
+
+    private static Range<Long> initiateFailure(
+            DefaultScheduler scheduler,
+            JobID jobId,
+            ExecutionAttemptID executionAttemptID,
+            Throwable exception) {
+        long start = System.currentTimeMillis();

Review comment:
       Thanks for clarification.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to