Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4364#discussion_r128511963
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 ---
    @@ -581,6 +565,106 @@ public void testSuspendWhileRestarting() throws 
Exception {
                assertEquals(JobStatus.SUSPENDED, eg.getState());
        }
     
    +   @Test
    +   public void testConcurrentLocalFailAndRestart() throws Exception {
    +           final ExecutionGraph eg = createSimpleTestGraph(new 
FixedDelayRestartStrategy(10, 0L));
    +           eg.setScheduleMode(ScheduleMode.EAGER);
    +           eg.scheduleForExecution();
    +
    +           waitUntilDeployedAndSwitchToRunning(eg, 1000);
    +
    +           final ExecutionJobVertex vertex = 
eg.getVerticesTopologically().iterator().next();
    +           final Execution first = 
vertex.getTaskVertices()[0].getCurrentExecutionAttempt();
    +           final Execution last = 
vertex.getTaskVertices()[vertex.getParallelism() - 
1].getCurrentExecutionAttempt();
    +
    +           final OneShotLatch failTrigger = new OneShotLatch();
    +           final CountDownLatch readyLatch = new CountDownLatch(2);
    +
    +           Thread failure1 = new Thread() {
    +                   @Override
    +                   public void run() {
    +                           readyLatch.countDown();
    +                           try {
    +                                   failTrigger.await();
    +                           } catch (InterruptedException ignored) {}
    +
    +                           first.fail(new Exception("intended test failure 
1"));
    +                   }
    +           };
    +
    +           Thread failure2 = new Thread() {
    +                   @Override
    +                   public void run() {
    +                           readyLatch.countDown();
    +                           try {
    +                                   failTrigger.await();
    +                           } catch (InterruptedException ignored) {}
    +
    +                           last.fail(new Exception("intended test failure 
2"));
    +                   }
    +           };
    +
    +           // make sure both threads start simultaneously
    +           failure1.start();
    +           failure2.start();
    +           readyLatch.await();
    +           failTrigger.trigger();
    +
    +           waitUntilJobStatus(eg, JobStatus.FAILING, 1000);
    +           completeCancellingForAllVertices(eg);
    +
    +           waitUntilJobStatus(eg, JobStatus.RUNNING, 1000);
    +           waitUntilDeployedAndSwitchToRunning(eg, 1000);
    +           finishAllVertices(eg);
    +
    +           eg.waitUntilTerminal();
    +           assertEquals(JobStatus.FINISHED, eg.getState());
    +   }
    +
    +   @Test
    +   public void testConcurrentGlobalFailAndRestarts() throws Exception {
    --- End diff --
    
    From the offline chat: I think you are missing the asynchrony in the 
restarting, leading to a lock in the cherrypicked code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to