[
https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15493806#comment-15493806
]
ASF GitHub Bot commented on FLINK-4615:
---------------------------------------
Github user ggevay commented on a diff in the pull request:
https://github.com/apache/flink/pull/2496#discussion_r79003843
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
---
@@ -82,43 +82,46 @@ protected void initialize() throws Exception {
public void run() throws Exception {
SuperstepKickoffLatch nextSuperstepLatch =
SuperstepKickoffLatchBroker.instance().get(brokerKey());
+ try {
+ while (this.running && !terminationRequested()) {
- while (this.running && !terminationRequested()) {
+ if (log.isInfoEnabled()) {
+ log.info(formatLogString("starting
iteration [" + currentIteration() + "]"));
+ }
- if (log.isInfoEnabled()) {
- log.info(formatLogString("starting iteration ["
+ currentIteration() + "]"));
- }
+ super.run();
- super.run();
+ // check if termination was requested
+ verifyEndOfSuperstepState();
- // check if termination was requested
- verifyEndOfSuperstepState();
+ if (isWorksetUpdate && isWorksetIteration) {
+ long numCollected =
worksetUpdateOutputCollector.getElementsCollectedAndReset();
+
worksetAggregator.aggregate(numCollected);
+ }
- if (isWorksetUpdate && isWorksetIteration) {
- long numCollected =
worksetUpdateOutputCollector.getElementsCollectedAndReset();
- worksetAggregator.aggregate(numCollected);
- }
-
- if (log.isInfoEnabled()) {
- log.info(formatLogString("finishing iteration
[" + currentIteration() + "]"));
- }
-
- // let the successors know that the end of this
superstep data is reached
- sendEndOfSuperstep();
-
- if (isWorksetUpdate) {
- // notify iteration head if responsible for
workset update
- worksetBackChannel.notifyOfEndOfSuperstep();
- }
-
- boolean terminated =
nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
+ if (log.isInfoEnabled()) {
+ log.info(formatLogString("finishing
iteration [" + currentIteration() + "]"));
+ }
- if (terminated) {
- requestTermination();
- }
- else {
- incrementIterationCounter();
+ // let the successors know that the end of this
superstep data is reached
+ sendEndOfSuperstep();
+
+ if (isWorksetUpdate) {
+ // notify iteration head if responsible
for workset update
+
worksetBackChannel.notifyOfEndOfSuperstep();
+ }
+
+ boolean terminated =
nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
+
+ if (terminated) {
+ requestTermination();
+ this.driver.cleanup();
--- End diff --
`cleanup` will be called twice at the last step of the iteration, because
it is also called in the `finally` block. This might potentially cause problems
if one of the drivers assumes that `cleanup` will only be called once.
> Reusing the memory allocated for the drivers and iterators
> ----------------------------------------------------------
>
> Key: FLINK-4615
> URL: https://issues.apache.org/jira/browse/FLINK-4615
> Project: Flink
> Issue Type: Sub-task
> Components: Local Runtime
> Reporter: ramkrishna.s.vasudevan
> Assignee: ramkrishna.s.vasudevan
> Fix For: 1.0.0
>
>
> Raising as a subtask so that individually can be committed and for better
> closer reviews.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)