[ https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15493806#comment-15493806 ]
ASF GitHub Bot commented on FLINK-4615: --------------------------------------- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2496#discussion_r79003843 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java --- @@ -82,43 +82,46 @@ protected void initialize() throws Exception { public void run() throws Exception { SuperstepKickoffLatch nextSuperstepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey()); + try { + while (this.running && !terminationRequested()) { - while (this.running && !terminationRequested()) { + if (log.isInfoEnabled()) { + log.info(formatLogString("starting iteration [" + currentIteration() + "]")); + } - if (log.isInfoEnabled()) { - log.info(formatLogString("starting iteration [" + currentIteration() + "]")); - } + super.run(); - super.run(); + // check if termination was requested + verifyEndOfSuperstepState(); - // check if termination was requested - verifyEndOfSuperstepState(); + if (isWorksetUpdate && isWorksetIteration) { + long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset(); + worksetAggregator.aggregate(numCollected); + } - if (isWorksetUpdate && isWorksetIteration) { - long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset(); - worksetAggregator.aggregate(numCollected); - } - - if (log.isInfoEnabled()) { - log.info(formatLogString("finishing iteration [" + currentIteration() + "]")); - } - - // let the successors know that the end of this superstep data is reached - sendEndOfSuperstep(); - - if (isWorksetUpdate) { - // notify iteration head if responsible for workset update - worksetBackChannel.notifyOfEndOfSuperstep(); - } - - boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1); + if (log.isInfoEnabled()) { + log.info(formatLogString("finishing iteration [" + currentIteration() + "]")); + } - if (terminated) { - requestTermination(); - } - else { - incrementIterationCounter(); + // let the successors know that the end of this superstep data is reached + sendEndOfSuperstep(); + + if (isWorksetUpdate) { + // notify iteration head if responsible for workset update + worksetBackChannel.notifyOfEndOfSuperstep(); + } + + boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1); + + if (terminated) { + requestTermination(); + this.driver.cleanup(); --- End diff -- `cleanup` will be called twice at the last step of the iteration, because it is also called in the `finally` block. This might potentially cause problems if one of the drivers assumes that `cleanup` will only be called once. > Reusing the memory allocated for the drivers and iterators > ---------------------------------------------------------- > > Key: FLINK-4615 > URL: https://issues.apache.org/jira/browse/FLINK-4615 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime > Reporter: ramkrishna.s.vasudevan > Assignee: ramkrishna.s.vasudevan > Fix For: 1.0.0 > > > Raising as a subtask so that individually can be committed and for better > closer reviews. -- This message was sent by Atlassian JIRA (v6.3.4#6332)