[ 
https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14627811#comment-14627811
 ] 

ASF GitHub Bot commented on FLINK-2292:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/896#discussion_r34662511
  
    --- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
    @@ -306,99 +308,100 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
         if (!isConnected) {
           log.debug(s"Dropping message $message because the TaskManager is 
currently " +
             "not connected to a JobManager.")
    -    }
    +    } else {
     
    -    // we order the messages by frequency, to make sure the code paths for 
matching
    -    // are as short as possible
    -    message match {
    +      // we order the messages by frequency, to make sure the code paths 
for matching
    +      // are as short as possible
    +      message match {
    +
    +        // tell the task about the availability of a new input partition
    +        case UpdateTaskSinglePartitionInfo(executionID, resultID, 
partitionInfo) =>
    +          updateTaskInputPartitions(executionID, List((resultID, 
partitionInfo)))
    +
    +        // tell the task about the availability of some new input 
partitions
    +        case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) 
=>
    +          updateTaskInputPartitions(executionID, partitionInfos)
    +
    +        // discards intermediate result partitions of a task execution on 
this TaskManager
    +        case FailIntermediateResultPartitions(executionID) =>
    +          log.info("Discarding the results produced by task execution " + 
executionID)
    +          if (network.isAssociated) {
    +            try {
    +              
network.getPartitionManager.releasePartitionsProducedBy(executionID)
    +            } catch {
    +              case t: Throwable => killTaskManagerFatal(
    +                "Fatal leak: Unable to release intermediate result 
partition data", t)
    +            }
    +          }
     
    -      // tell the task about the availability of a new input partition
    -      case UpdateTaskSinglePartitionInfo(executionID, resultID, 
partitionInfo) =>
    -        updateTaskInputPartitions(executionID, List((resultID, 
partitionInfo)))
    +        // notifies the TaskManager that the state of a task has changed.
    +        // the TaskManager informs the JobManager and cleans up in case 
the transition
    +        // was into a terminal state, or in case the JobManager cannot be 
informed of the
    +        // state transition
     
    -      // tell the task about the availability of some new input partitions
    -      case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) =>
    -        updateTaskInputPartitions(executionID, partitionInfos)
    +        case updateMsg@UpdateTaskExecutionState(taskExecutionState: 
TaskExecutionState) =>
     
    -      // discards intermediate result partitions of a task execution on 
this TaskManager
    -      case FailIntermediateResultPartitions(executionID) =>
    -        log.info("Discarding the results produced by task execution " + 
executionID)
    -        if (network.isAssociated) {
    -          try {
    -            
network.getPartitionManager.releasePartitionsProducedBy(executionID)
    -          } catch {
    -            case t: Throwable => killTaskManagerFatal(
    -                "Fatal leak: Unable to release intermediate result 
partition data", t)
    -          }
    -        }
    +          // we receive these from our tasks and forward them to the 
JobManager
    --- End diff --
    
    Are we sure nothing changed there in the logic? We have spent so much time 
getting this delicate thing right, I am always extra careful when seeing 
changes there.


> Report accumulators periodically while job is running
> -----------------------------------------------------
>
>                 Key: FLINK-2292
>                 URL: https://issues.apache.org/jira/browse/FLINK-2292
>             Project: Flink
>          Issue Type: Sub-task
>          Components: JobManager, TaskManager
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>             Fix For: 0.10
>
>
> Accumulators should be sent periodically, as part of the heartbeat that sends 
> metrics. This allows them to be updated in real time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to