[ 
https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14621967#comment-14621967
 ] 

ASF GitHub Bot commented on FLINK-2292:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/896#discussion_r34335380
  
    --- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
    @@ -306,99 +308,100 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
         if (!isConnected) {
           log.debug(s"Dropping message $message because the TaskManager is 
currently " +
             "not connected to a JobManager.")
    -    }
    +    } else {
     
    -    // we order the messages by frequency, to make sure the code paths for 
matching
    -    // are as short as possible
    -    message match {
    +      // we order the messages by frequency, to make sure the code paths 
for matching
    +      // are as short as possible
    +      message match {
    +
    +        // tell the task about the availability of a new input partition
    +        case UpdateTaskSinglePartitionInfo(executionID, resultID, 
partitionInfo) =>
    +          updateTaskInputPartitions(executionID, List((resultID, 
partitionInfo)))
    +
    +        // tell the task about the availability of some new input 
partitions
    +        case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) 
=>
    +          updateTaskInputPartitions(executionID, partitionInfos)
    +
    +        // discards intermediate result partitions of a task execution on 
this TaskManager
    +        case FailIntermediateResultPartitions(executionID) =>
    +          log.info("Discarding the results produced by task execution " + 
executionID)
    +          if (network.isAssociated) {
    +            try {
    +              
network.getPartitionManager.releasePartitionsProducedBy(executionID)
    +            } catch {
    +              case t: Throwable => killTaskManagerFatal(
    +                "Fatal leak: Unable to release intermediate result 
partition data", t)
    +            }
    +          }
     
    -      // tell the task about the availability of a new input partition
    -      case UpdateTaskSinglePartitionInfo(executionID, resultID, 
partitionInfo) =>
    -        updateTaskInputPartitions(executionID, List((resultID, 
partitionInfo)))
    +        // notifies the TaskManager that the state of a task has changed.
    +        // the TaskManager informs the JobManager and cleans up in case 
the transition
    +        // was into a terminal state, or in case the JobManager cannot be 
informed of the
    +        // state transition
     
    -      // tell the task about the availability of some new input partitions
    -      case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) =>
    -        updateTaskInputPartitions(executionID, partitionInfos)
    +        case updateMsg@UpdateTaskExecutionState(taskExecutionState: 
TaskExecutionState) =>
     
    -      // discards intermediate result partitions of a task execution on 
this TaskManager
    -      case FailIntermediateResultPartitions(executionID) =>
    -        log.info("Discarding the results produced by task execution " + 
executionID)
    -        if (network.isAssociated) {
    -          try {
    -            
network.getPartitionManager.releasePartitionsProducedBy(executionID)
    -          } catch {
    -            case t: Throwable => killTaskManagerFatal(
    -                "Fatal leak: Unable to release intermediate result 
partition data", t)
    -          }
    -        }
    +          // we receive these from our tasks and forward them to the 
JobManager
    --- End diff --
    
    This is a bug I discovered while reading through the code. It prevents 
processing of messages when the task manager is not connected to the job 
manager. If you look at line 307, it says it would skip the message but 
continues to process it. If you want I can open a separate pull request.


> Report accumulators periodically while job is running
> -----------------------------------------------------
>
>                 Key: FLINK-2292
>                 URL: https://issues.apache.org/jira/browse/FLINK-2292
>             Project: Flink
>          Issue Type: Sub-task
>          Components: JobManager, TaskManager
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>             Fix For: 0.10
>
>
> Accumulators should be sent periodically, as part of the heartbeat that sends 
> metrics. This allows them to be updated in real time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to