Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2594#discussion_r82563262
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
    @@ -127,12 +195,423 @@ public void start() {
                }
        }
     
    +   /**
    +    * Called to shut down the TaskManager. The method closes all 
TaskManager services.
    +    */
    +   @Override
    +   public void shutDown() {
    +           log.info("Stopping TaskManager {}.", getAddress());
    +
    +           if (resourceManagerConnection.isConnected()) {
    +                   try {
    +                           resourceManagerConnection.close();
    +                   } catch (Exception e) {
    +                           log.error("Could not cleanly close the 
ResourceManager connection.", e);
    +                   }
    +           }
    +
    +           try {
    +                   ioManager.shutdown();
    +           } catch (Exception e) {
    +                   log.error("IOManager did not shut down properly.", e);
    +           }
    +
    +           try {
    +                   memoryManager.shutdown();
    +           } catch (Exception e) {
    +                   log.error("MemoryManager did not shut down properly.", 
e);
    +           }
    +
    +           try {
    +                   networkEnvironment.shutdown();
    +           } catch (Exception e) {
    +                   log.error("Network environment did not shut down 
properly.", e);
    +           }
    +
    +           try {
    +                   fileCache.shutdown();
    +           } catch (Exception e) {
    +                   log.error("File cache did not shut down properly.", e);
    +           }
    +
    +           try {
    +                   metricRegistry.shutdown();
    +           } catch (Exception e) {
    +                   log.error("MetricRegistry did not shut down properly.", 
e);
    +           }
    +
    +           log.info("Stopped TaskManager {}.", getAddress());
    +   }
    +
    +   // 
========================================================================
    +   //  RPC methods
    +   // 
========================================================================
    +
    +   // 
----------------------------------------------------------------------
    +   // Task lifecycle RPCs
    +   // 
----------------------------------------------------------------------
    +
    +   @RpcMethod
    +   public Acknowledge submitTask(TaskDeploymentDescriptor tdd, ResourceID 
jobManagerID) throws TaskSubmissionException {
    +
    +           JobManagerConnection jobManagerConnection = 
getJobManagerConnection(jobManagerID);
    +
    +           if (jobManagerConnection == null) {
    +                   final String message = "Could not submit task because 
JobManager " + jobManagerID +
    +                           " was not associated.";
    +
    +                   log.debug(message);
    +                   throw new TaskSubmissionException(message);
    +           }
    +
    +           TaskSlot taskSlot = taskSlots.get(tdd.getAllocationID());
    +
    +           if (taskSlot == null) {
    +                   final String message = "No task slot allocated for 
allocation ID " + tdd.getAllocationID() + '.';
    +                   log.debug(message);
    +                   throw new TaskSubmissionException(message);
    +           }
    +
    +           TaskMetricGroup taskMetricGroup = 
taskManagerMetricGroup.addTaskForJob(tdd);
    +
    +           InputSplitProvider inputSplitProvider = new 
RpcInputSplitProvider(
    +                   jobManagerConnection.getJobManagerGateway(),
    +                   tdd.getJobID(),
    +                   tdd.getVertexID(),
    +                   tdd.getExecutionId(),
    +                   taskManagerConfiguration.getTimeout());
    +
    +           TaskManagerActions taskManagerActions = 
jobManagerConnection.getTaskManagerActions();
    +           CheckpointResponder checkpointResponder = 
jobManagerConnection.getCheckpointResponder();
    +           LibraryCacheManager libraryCache = 
jobManagerConnection.getLibraryCacheManager();
    +           ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier = 
jobManagerConnection.getResultPartitionConsumableNotifier();
    +           PartitionStateChecker partitionStateChecker = 
jobManagerConnection.getPartitionStateChecker();
    +
    +           Task task = new Task(
    +                   tdd,
    +                   memoryManager,
    +                   ioManager,
    +                   networkEnvironment,
    +                   broadcastVariableManager,
    +                   taskManagerActions,
    +                   inputSplitProvider,
    +                   checkpointResponder,
    +                   libraryCache,
    +                   fileCache,
    +                   taskManagerRuntimeInfo,
    +                   taskMetricGroup,
    +                   resultPartitionConsumableNotifier,
    +                   partitionStateChecker,
    +                   getRpcService().getExecutor());
    +
    +           log.info("Received task {}.", 
task.getTaskInfo().getTaskNameWithSubtasks());
    +
    +           if(taskSlot.add(task)) {
    +                   TaskSlotMapping taskSlotMapping = new 
TaskSlotMapping(task, taskSlot);
    +
    +                   taskSlotMappings.put(task.getExecutionId(), 
taskSlotMapping);
    +                   task.startTaskThread();
    +
    +                   return Acknowledge.get();
    +           } else {
    +                   final String message = "TaskManager already contains a 
task for id " +
    +                           task.getExecutionId() + '.';
    +
    +                   log.debug(message);
    +                   throw new TaskSubmissionException(message);
    +           }
    +   }
    +
    +   @RpcMethod
    +   public Acknowledge cancelTask(ExecutionAttemptID executionAttemptID) 
throws TaskException {
    +           final Task task = getTask(executionAttemptID);
    +
    +           if (task != null) {
    +                   try {
    +                           task.cancelExecution();
    +                           return Acknowledge.get();
    +                   } catch (Throwable t) {
    +                           throw new TaskException("Cannot cancel task for 
execution " + executionAttemptID + '.', t);
    +                   }
    +           } else {
    +                   final String message = "Cannot find task to stop for 
execution " + executionAttemptID + '.';
    +
    +                   log.debug(message);
    +                   throw new TaskException(message);
    +           }
    +   }
    +
    +   @RpcMethod
    +   public Acknowledge stopTask(ExecutionAttemptID executionAttemptID) 
throws TaskException {
    +           final Task task = getTask(executionAttemptID);
    +
    +           if (task != null) {
    +                   try {
    +                           task.stopExecution();
    +                           return Acknowledge.get();
    +                   } catch (Throwable t) {
    +                           throw new TaskException("Cannot stop task for 
execution " + executionAttemptID + '.', t);
    +                   }
    +           } else {
    +                   final String message = "Cannot find task to stop for 
execution " + executionAttemptID + '.';
    +
    +                   log.debug(message);
    +                   throw new TaskException(message);
    +           }
    +   }
    +
    +   // 
----------------------------------------------------------------------
    +   // Partition lifecycle RPCs
    +   // 
----------------------------------------------------------------------
    +
    +   @RpcMethod
    +   public Acknowledge updatePartitions(final ExecutionAttemptID 
executionAttemptID, Collection<PartitionInfo> partitionInfos) throws 
PartitionException {
    +           final Task task = getTask(executionAttemptID);
    +
    +           if (task != null) {
    +                   for (final PartitionInfo partitionInfo: partitionInfos) 
{
    +                           IntermediateDataSetID 
intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID();
    +
    +                           final SingleInputGate singleInputGate = 
task.getInputGateById(intermediateResultPartitionID);
    +
    +                           if (singleInputGate != null) {
    +                                   // Run asynchronously because it might 
be blocking
    +                                   getRpcService().execute(new Runnable() {
    +                                           @Override
    +                                           public void run() {
    +                                                   try {
    +                                                           
singleInputGate.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor());
    +                                                   } catch (IOException | 
InterruptedException e) {
    +                                                           
log.error("Could not update input data location for task {}. Trying to fail 
task.", task.getTaskInfo().getTaskName(), e);
    +
    +                                                           try {
    +                                                                   
task.failExternally(e);
    +                                                           } catch 
(RuntimeException re) {
    +                                                                   // 
TODO: Check whether we need this or make exception in failExtenally checked
    +                                                                   
log.error("Failed canceling task with execution ID {} after task update 
failure.", executionAttemptID, re);
    +                                                           }
    +                                                   }
    +                                           }
    +                                   });
    +                           } else {
    +                                   throw new PartitionException("No reader 
with ID " +
    +                                           intermediateResultPartitionID + 
" for task " + executionAttemptID +
    +                                           " was found.");
    +                           }
    +                   }
    +
    +                   return Acknowledge.get();
    +           } else {
    +                   log.debug("Discard update for input partitions of task 
{}. Task is no longer running.", executionAttemptID);
    +                   return Acknowledge.get();
    +           }
    +   }
    +
    +   @RpcMethod
    +   public void failPartition(ExecutionAttemptID executionAttemptID) {
    +           log.info("Discarding the results produced by task execution 
{}.", executionAttemptID);
    +
    +           try {
    +                   
networkEnvironment.getResultPartitionManager().releasePartitionsProducedBy(executionAttemptID);
    +           } catch (Throwable t) {
    +                   // TODO: Do we still need this catch branch?
    +                   onFatalError(t);
    +           }
    +
    +           // TODO: Maybe it's better to return an Acknowledge here to 
notify the JM about the success/failure with an Exception
    +   }
    +
    +   // 
----------------------------------------------------------------------
    +   // Checkpointing RPCs
    +   // 
----------------------------------------------------------------------
    +
    +   @RpcMethod
    +   public Acknowledge triggerCheckpoint(ExecutionAttemptID 
executionAttemptID, long checkpointId, long checkpointTimestamp) throws 
CheckpointException {
    +           log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, 
checkpointTimestamp, executionAttemptID);
    +
    +           final Task task = getTask(executionAttemptID);
    +
    +           if (task != null) {
    +                   task.triggerCheckpointBarrier(checkpointId, 
checkpointTimestamp);
    +
    +                   return Acknowledge.get();
    +           } else {
    +                   final String message = "TaskManager received a 
checkpoint request for unknown task " + executionAttemptID + '.';
    +
    +                   log.debug(message);
    +                   throw new CheckpointException(message);
    +           }
    +   }
    +
    --- End diff --
    
    Good catch. Will fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to