[ 
https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15493841#comment-15493841
 ] 

ASF GitHub Bot commented on FLINK-4615:
---------------------------------------

Github user ramkrish86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2496#discussion_r79006658
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
 ---
    @@ -82,43 +82,46 @@ protected void initialize() throws Exception {
        public void run() throws Exception {
                
                SuperstepKickoffLatch nextSuperstepLatch = 
SuperstepKickoffLatchBroker.instance().get(brokerKey());
    +           try {
    +                   while (this.running && !terminationRequested()) {
     
    -           while (this.running && !terminationRequested()) {
    +                           if (log.isInfoEnabled()) {
    +                                   log.info(formatLogString("starting 
iteration [" + currentIteration() + "]"));
    +                           }
     
    -                   if (log.isInfoEnabled()) {
    -                           log.info(formatLogString("starting iteration [" 
+ currentIteration() + "]"));
    -                   }
    +                           super.run();
     
    -                   super.run();
    +                           // check if termination was requested
    +                           verifyEndOfSuperstepState();
     
    -                   // check if termination was requested
    -                   verifyEndOfSuperstepState();
    +                           if (isWorksetUpdate && isWorksetIteration) {
    +                                   long numCollected = 
worksetUpdateOutputCollector.getElementsCollectedAndReset();
    +                                   
worksetAggregator.aggregate(numCollected);
    +                           }
     
    -                   if (isWorksetUpdate && isWorksetIteration) {
    -                           long numCollected = 
worksetUpdateOutputCollector.getElementsCollectedAndReset();
    -                           worksetAggregator.aggregate(numCollected);
    -                   }
    -                   
    -                   if (log.isInfoEnabled()) {
    -                           log.info(formatLogString("finishing iteration 
[" + currentIteration() + "]"));
    -                   }
    -                   
    -                   // let the successors know that the end of this 
superstep data is reached
    -                   sendEndOfSuperstep();
    -                   
    -                   if (isWorksetUpdate) {
    -                           // notify iteration head if responsible for 
workset update
    -                           worksetBackChannel.notifyOfEndOfSuperstep();
    -                   }
    -                   
    -                   boolean terminated = 
nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
    +                           if (log.isInfoEnabled()) {
    +                                   log.info(formatLogString("finishing 
iteration [" + currentIteration() + "]"));
    +                           }
     
    -                   if (terminated) {
    -                           requestTermination();
    -                   }
    -                   else {
    -                           incrementIterationCounter();
    +                           // let the successors know that the end of this 
superstep data is reached
    +                           sendEndOfSuperstep();
    +
    +                           if (isWorksetUpdate) {
    +                                   // notify iteration head if responsible 
for workset update
    +                                   
worksetBackChannel.notifyOfEndOfSuperstep();
    +                           }
    +
    +                           boolean terminated = 
nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
    +
    +                           if (terminated) {
    +                                   requestTermination();
    +                                   this.driver.cleanup();
    --- End diff --
    
    Am very sorry. That was a mistake. An oversight. I forgot to remove the 
cleanup() called after termination.


> Reusing the memory allocated for the drivers and iterators
> ----------------------------------------------------------
>
>                 Key: FLINK-4615
>                 URL: https://issues.apache.org/jira/browse/FLINK-4615
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Local Runtime
>            Reporter: ramkrishna.s.vasudevan
>            Assignee: ramkrishna.s.vasudevan
>             Fix For: 1.0.0
>
>
> Raising as a subtask so that individually can be committed and for better 
> closer reviews.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to