Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2496#discussion_r79003843
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
 ---
    @@ -82,43 +82,46 @@ protected void initialize() throws Exception {
        public void run() throws Exception {
                
                SuperstepKickoffLatch nextSuperstepLatch = 
SuperstepKickoffLatchBroker.instance().get(brokerKey());
    +           try {
    +                   while (this.running && !terminationRequested()) {
     
    -           while (this.running && !terminationRequested()) {
    +                           if (log.isInfoEnabled()) {
    +                                   log.info(formatLogString("starting 
iteration [" + currentIteration() + "]"));
    +                           }
     
    -                   if (log.isInfoEnabled()) {
    -                           log.info(formatLogString("starting iteration [" 
+ currentIteration() + "]"));
    -                   }
    +                           super.run();
     
    -                   super.run();
    +                           // check if termination was requested
    +                           verifyEndOfSuperstepState();
     
    -                   // check if termination was requested
    -                   verifyEndOfSuperstepState();
    +                           if (isWorksetUpdate && isWorksetIteration) {
    +                                   long numCollected = 
worksetUpdateOutputCollector.getElementsCollectedAndReset();
    +                                   
worksetAggregator.aggregate(numCollected);
    +                           }
     
    -                   if (isWorksetUpdate && isWorksetIteration) {
    -                           long numCollected = 
worksetUpdateOutputCollector.getElementsCollectedAndReset();
    -                           worksetAggregator.aggregate(numCollected);
    -                   }
    -                   
    -                   if (log.isInfoEnabled()) {
    -                           log.info(formatLogString("finishing iteration 
[" + currentIteration() + "]"));
    -                   }
    -                   
    -                   // let the successors know that the end of this 
superstep data is reached
    -                   sendEndOfSuperstep();
    -                   
    -                   if (isWorksetUpdate) {
    -                           // notify iteration head if responsible for 
workset update
    -                           worksetBackChannel.notifyOfEndOfSuperstep();
    -                   }
    -                   
    -                   boolean terminated = 
nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
    +                           if (log.isInfoEnabled()) {
    +                                   log.info(formatLogString("finishing 
iteration [" + currentIteration() + "]"));
    +                           }
     
    -                   if (terminated) {
    -                           requestTermination();
    -                   }
    -                   else {
    -                           incrementIterationCounter();
    +                           // let the successors know that the end of this 
superstep data is reached
    +                           sendEndOfSuperstep();
    +
    +                           if (isWorksetUpdate) {
    +                                   // notify iteration head if responsible 
for workset update
    +                                   
worksetBackChannel.notifyOfEndOfSuperstep();
    +                           }
    +
    +                           boolean terminated = 
nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
    +
    +                           if (terminated) {
    +                                   requestTermination();
    +                                   this.driver.cleanup();
    --- End diff --
    
    `cleanup` will be called twice at the last step of the iteration, because 
it is also called in the `finally` block. This might potentially cause problems 
if one of the drivers assumes that `cleanup` will only be called once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to