[ https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14621967#comment-14621967 ]
ASF GitHub Bot commented on FLINK-2292: --------------------------------------- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34335380 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -306,99 +308,100 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { if (!isConnected) { log.debug(s"Dropping message $message because the TaskManager is currently " + "not connected to a JobManager.") - } + } else { - // we order the messages by frequency, to make sure the code paths for matching - // are as short as possible - message match { + // we order the messages by frequency, to make sure the code paths for matching + // are as short as possible + message match { + + // tell the task about the availability of a new input partition + case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) => + updateTaskInputPartitions(executionID, List((resultID, partitionInfo))) + + // tell the task about the availability of some new input partitions + case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) => + updateTaskInputPartitions(executionID, partitionInfos) + + // discards intermediate result partitions of a task execution on this TaskManager + case FailIntermediateResultPartitions(executionID) => + log.info("Discarding the results produced by task execution " + executionID) + if (network.isAssociated) { + try { + network.getPartitionManager.releasePartitionsProducedBy(executionID) + } catch { + case t: Throwable => killTaskManagerFatal( + "Fatal leak: Unable to release intermediate result partition data", t) + } + } - // tell the task about the availability of a new input partition - case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) => - updateTaskInputPartitions(executionID, List((resultID, partitionInfo))) + // notifies the TaskManager that the state of a task has changed. + // the TaskManager informs the JobManager and cleans up in case the transition + // was into a terminal state, or in case the JobManager cannot be informed of the + // state transition - // tell the task about the availability of some new input partitions - case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) => - updateTaskInputPartitions(executionID, partitionInfos) + case updateMsg@UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) => - // discards intermediate result partitions of a task execution on this TaskManager - case FailIntermediateResultPartitions(executionID) => - log.info("Discarding the results produced by task execution " + executionID) - if (network.isAssociated) { - try { - network.getPartitionManager.releasePartitionsProducedBy(executionID) - } catch { - case t: Throwable => killTaskManagerFatal( - "Fatal leak: Unable to release intermediate result partition data", t) - } - } + // we receive these from our tasks and forward them to the JobManager --- End diff -- This is a bug I discovered while reading through the code. It prevents processing of messages when the task manager is not connected to the job manager. If you look at line 307, it says it would skip the message but continues to process it. If you want I can open a separate pull request. > Report accumulators periodically while job is running > ----------------------------------------------------- > > Key: FLINK-2292 > URL: https://issues.apache.org/jira/browse/FLINK-2292 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager > Reporter: Maximilian Michels > Assignee: Maximilian Michels > Fix For: 0.10 > > > Accumulators should be sent periodically, as part of the heartbeat that sends > metrics. This allows them to be updated in real time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)