[ 
https://issues.apache.org/jira/browse/FLINK-4738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15561670#comment-15561670
 ] 

ASF GitHub Bot commented on FLINK-4738:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2594#discussion_r82563262
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
    @@ -127,12 +195,423 @@ public void start() {
                }
        }
     
    +   /**
    +    * Called to shut down the TaskManager. The method closes all 
TaskManager services.
    +    */
    +   @Override
    +   public void shutDown() {
    +           log.info("Stopping TaskManager {}.", getAddress());
    +
    +           if (resourceManagerConnection.isConnected()) {
    +                   try {
    +                           resourceManagerConnection.close();
    +                   } catch (Exception e) {
    +                           log.error("Could not cleanly close the 
ResourceManager connection.", e);
    +                   }
    +           }
    +
    +           try {
    +                   ioManager.shutdown();
    +           } catch (Exception e) {
    +                   log.error("IOManager did not shut down properly.", e);
    +           }
    +
    +           try {
    +                   memoryManager.shutdown();
    +           } catch (Exception e) {
    +                   log.error("MemoryManager did not shut down properly.", 
e);
    +           }
    +
    +           try {
    +                   networkEnvironment.shutdown();
    +           } catch (Exception e) {
    +                   log.error("Network environment did not shut down 
properly.", e);
    +           }
    +
    +           try {
    +                   fileCache.shutdown();
    +           } catch (Exception e) {
    +                   log.error("File cache did not shut down properly.", e);
    +           }
    +
    +           try {
    +                   metricRegistry.shutdown();
    +           } catch (Exception e) {
    +                   log.error("MetricRegistry did not shut down properly.", 
e);
    +           }
    +
    +           log.info("Stopped TaskManager {}.", getAddress());
    +   }
    +
    +   // 
========================================================================
    +   //  RPC methods
    +   // 
========================================================================
    +
    +   // 
----------------------------------------------------------------------
    +   // Task lifecycle RPCs
    +   // 
----------------------------------------------------------------------
    +
    +   @RpcMethod
    +   public Acknowledge submitTask(TaskDeploymentDescriptor tdd, ResourceID 
jobManagerID) throws TaskSubmissionException {
    +
    +           JobManagerConnection jobManagerConnection = 
getJobManagerConnection(jobManagerID);
    +
    +           if (jobManagerConnection == null) {
    +                   final String message = "Could not submit task because 
JobManager " + jobManagerID +
    +                           " was not associated.";
    +
    +                   log.debug(message);
    +                   throw new TaskSubmissionException(message);
    +           }
    +
    +           TaskSlot taskSlot = taskSlots.get(tdd.getAllocationID());
    +
    +           if (taskSlot == null) {
    +                   final String message = "No task slot allocated for 
allocation ID " + tdd.getAllocationID() + '.';
    +                   log.debug(message);
    +                   throw new TaskSubmissionException(message);
    +           }
    +
    +           TaskMetricGroup taskMetricGroup = 
taskManagerMetricGroup.addTaskForJob(tdd);
    +
    +           InputSplitProvider inputSplitProvider = new 
RpcInputSplitProvider(
    +                   jobManagerConnection.getJobManagerGateway(),
    +                   tdd.getJobID(),
    +                   tdd.getVertexID(),
    +                   tdd.getExecutionId(),
    +                   taskManagerConfiguration.getTimeout());
    +
    +           TaskManagerActions taskManagerActions = 
jobManagerConnection.getTaskManagerActions();
    +           CheckpointResponder checkpointResponder = 
jobManagerConnection.getCheckpointResponder();
    +           LibraryCacheManager libraryCache = 
jobManagerConnection.getLibraryCacheManager();
    +           ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier = 
jobManagerConnection.getResultPartitionConsumableNotifier();
    +           PartitionStateChecker partitionStateChecker = 
jobManagerConnection.getPartitionStateChecker();
    +
    +           Task task = new Task(
    +                   tdd,
    +                   memoryManager,
    +                   ioManager,
    +                   networkEnvironment,
    +                   broadcastVariableManager,
    +                   taskManagerActions,
    +                   inputSplitProvider,
    +                   checkpointResponder,
    +                   libraryCache,
    +                   fileCache,
    +                   taskManagerRuntimeInfo,
    +                   taskMetricGroup,
    +                   resultPartitionConsumableNotifier,
    +                   partitionStateChecker,
    +                   getRpcService().getExecutor());
    +
    +           log.info("Received task {}.", 
task.getTaskInfo().getTaskNameWithSubtasks());
    +
    +           if(taskSlot.add(task)) {
    +                   TaskSlotMapping taskSlotMapping = new 
TaskSlotMapping(task, taskSlot);
    +
    +                   taskSlotMappings.put(task.getExecutionId(), 
taskSlotMapping);
    +                   task.startTaskThread();
    +
    +                   return Acknowledge.get();
    +           } else {
    +                   final String message = "TaskManager already contains a 
task for id " +
    +                           task.getExecutionId() + '.';
    +
    +                   log.debug(message);
    +                   throw new TaskSubmissionException(message);
    +           }
    +   }
    +
    +   @RpcMethod
    +   public Acknowledge cancelTask(ExecutionAttemptID executionAttemptID) 
throws TaskException {
    +           final Task task = getTask(executionAttemptID);
    +
    +           if (task != null) {
    +                   try {
    +                           task.cancelExecution();
    +                           return Acknowledge.get();
    +                   } catch (Throwable t) {
    +                           throw new TaskException("Cannot cancel task for 
execution " + executionAttemptID + '.', t);
    +                   }
    +           } else {
    +                   final String message = "Cannot find task to stop for 
execution " + executionAttemptID + '.';
    +
    +                   log.debug(message);
    +                   throw new TaskException(message);
    +           }
    +   }
    +
    +   @RpcMethod
    +   public Acknowledge stopTask(ExecutionAttemptID executionAttemptID) 
throws TaskException {
    +           final Task task = getTask(executionAttemptID);
    +
    +           if (task != null) {
    +                   try {
    +                           task.stopExecution();
    +                           return Acknowledge.get();
    +                   } catch (Throwable t) {
    +                           throw new TaskException("Cannot stop task for 
execution " + executionAttemptID + '.', t);
    +                   }
    +           } else {
    +                   final String message = "Cannot find task to stop for 
execution " + executionAttemptID + '.';
    +
    +                   log.debug(message);
    +                   throw new TaskException(message);
    +           }
    +   }
    +
    +   // 
----------------------------------------------------------------------
    +   // Partition lifecycle RPCs
    +   // 
----------------------------------------------------------------------
    +
    +   @RpcMethod
    +   public Acknowledge updatePartitions(final ExecutionAttemptID 
executionAttemptID, Collection<PartitionInfo> partitionInfos) throws 
PartitionException {
    +           final Task task = getTask(executionAttemptID);
    +
    +           if (task != null) {
    +                   for (final PartitionInfo partitionInfo: partitionInfos) 
{
    +                           IntermediateDataSetID 
intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID();
    +
    +                           final SingleInputGate singleInputGate = 
task.getInputGateById(intermediateResultPartitionID);
    +
    +                           if (singleInputGate != null) {
    +                                   // Run asynchronously because it might 
be blocking
    +                                   getRpcService().execute(new Runnable() {
    +                                           @Override
    +                                           public void run() {
    +                                                   try {
    +                                                           
singleInputGate.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor());
    +                                                   } catch (IOException | 
InterruptedException e) {
    +                                                           
log.error("Could not update input data location for task {}. Trying to fail 
task.", task.getTaskInfo().getTaskName(), e);
    +
    +                                                           try {
    +                                                                   
task.failExternally(e);
    +                                                           } catch 
(RuntimeException re) {
    +                                                                   // 
TODO: Check whether we need this or make exception in failExtenally checked
    +                                                                   
log.error("Failed canceling task with execution ID {} after task update 
failure.", executionAttemptID, re);
    +                                                           }
    +                                                   }
    +                                           }
    +                                   });
    +                           } else {
    +                                   throw new PartitionException("No reader 
with ID " +
    +                                           intermediateResultPartitionID + 
" for task " + executionAttemptID +
    +                                           " was found.");
    +                           }
    +                   }
    +
    +                   return Acknowledge.get();
    +           } else {
    +                   log.debug("Discard update for input partitions of task 
{}. Task is no longer running.", executionAttemptID);
    +                   return Acknowledge.get();
    +           }
    +   }
    +
    +   @RpcMethod
    +   public void failPartition(ExecutionAttemptID executionAttemptID) {
    +           log.info("Discarding the results produced by task execution 
{}.", executionAttemptID);
    +
    +           try {
    +                   
networkEnvironment.getResultPartitionManager().releasePartitionsProducedBy(executionAttemptID);
    +           } catch (Throwable t) {
    +                   // TODO: Do we still need this catch branch?
    +                   onFatalError(t);
    +           }
    +
    +           // TODO: Maybe it's better to return an Acknowledge here to 
notify the JM about the success/failure with an Exception
    +   }
    +
    +   // 
----------------------------------------------------------------------
    +   // Checkpointing RPCs
    +   // 
----------------------------------------------------------------------
    +
    +   @RpcMethod
    +   public Acknowledge triggerCheckpoint(ExecutionAttemptID 
executionAttemptID, long checkpointId, long checkpointTimestamp) throws 
CheckpointException {
    +           log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, 
checkpointTimestamp, executionAttemptID);
    +
    +           final Task task = getTask(executionAttemptID);
    +
    +           if (task != null) {
    +                   task.triggerCheckpointBarrier(checkpointId, 
checkpointTimestamp);
    +
    +                   return Acknowledge.get();
    +           } else {
    +                   final String message = "TaskManager received a 
checkpoint request for unknown task " + executionAttemptID + '.';
    +
    +                   log.debug(message);
    +                   throw new CheckpointException(message);
    +           }
    +   }
    +
    --- End diff --
    
    Good catch. Will fix it.


> Port TaskManager logic to TaskExecutor
> --------------------------------------
>
>                 Key: FLINK-4738
>                 URL: https://issues.apache.org/jira/browse/FLINK-4738
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>
> Port the basic operations of the {{TaskManager}} to the {{TaskExecutor}}. 
> These operations include the task lifecycle methods, {{JobManager}} 
> association logic and setup of TaskManager components.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to