Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2496#discussion_r79003843 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java --- @@ -82,43 +82,46 @@ protected void initialize() throws Exception { public void run() throws Exception { SuperstepKickoffLatch nextSuperstepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey()); + try { + while (this.running && !terminationRequested()) { - while (this.running && !terminationRequested()) { + if (log.isInfoEnabled()) { + log.info(formatLogString("starting iteration [" + currentIteration() + "]")); + } - if (log.isInfoEnabled()) { - log.info(formatLogString("starting iteration [" + currentIteration() + "]")); - } + super.run(); - super.run(); + // check if termination was requested + verifyEndOfSuperstepState(); - // check if termination was requested - verifyEndOfSuperstepState(); + if (isWorksetUpdate && isWorksetIteration) { + long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset(); + worksetAggregator.aggregate(numCollected); + } - if (isWorksetUpdate && isWorksetIteration) { - long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset(); - worksetAggregator.aggregate(numCollected); - } - - if (log.isInfoEnabled()) { - log.info(formatLogString("finishing iteration [" + currentIteration() + "]")); - } - - // let the successors know that the end of this superstep data is reached - sendEndOfSuperstep(); - - if (isWorksetUpdate) { - // notify iteration head if responsible for workset update - worksetBackChannel.notifyOfEndOfSuperstep(); - } - - boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1); + if (log.isInfoEnabled()) { + log.info(formatLogString("finishing iteration [" + currentIteration() + "]")); + } - if (terminated) { - requestTermination(); - } - else { - incrementIterationCounter(); + // let the successors know that the end of this superstep data is reached + sendEndOfSuperstep(); + + if (isWorksetUpdate) { + // notify iteration head if responsible for workset update + worksetBackChannel.notifyOfEndOfSuperstep(); + } + + boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1); + + if (terminated) { + requestTermination(); + this.driver.cleanup(); --- End diff -- `cleanup` will be called twice at the last step of the iteration, because it is also called in the `finally` block. This might potentially cause problems if one of the drivers assumes that `cleanup` will only be called once.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---