[ 
https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15526749#comment-15526749
 ] 

ASF GitHub Bot commented on FLINK-4657:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2550#discussion_r80740727
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
    @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String 
address) {
                //TODO:: register at the RM
        }
     
    +   @RpcMethod
    +   public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, 
final ExecutionAttemptID executionAttempt) {
    +           final byte[] serializedInputSplit;
    +
    +           final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
    +           if (execution == null) {
    +                   log.error("Can not find Execution for attempt {}.", 
executionAttempt);
    +                   return null;
    +           } else {
    +                   final Slot slot = execution.getAssignedResource();
    +                   final int taskId = 
execution.getVertex().getParallelSubtaskIndex();
    +                   final String host = slot != null ? 
slot.getTaskManagerLocation().getHostname() : null;
    +
    +                   final ExecutionJobVertex vertex = 
executionGraph.getJobVertex(vertexID);
    +                   if (vertex != null) {
    +                           final InputSplitAssigner splitAssigner = 
vertex.getSplitAssigner();
    +                           if (splitAssigner != null) {
    +                                   final InputSplit nextInputSplit = 
splitAssigner.getNextInputSplit(host, taskId);
    +
    +                                   log.debug("Send next input split {}.", 
nextInputSplit);
    +                                   try {
    +                                           serializedInputSplit = 
InstantiationUtil.serializeObject(nextInputSplit);
    +                                   } catch (Exception ex) {
    +                                           log.error("Could not serialize 
the next input split of class {}.", nextInputSplit.getClass(), ex);
    +                                           vertex.fail(new 
RuntimeException("Could not serialize the next input split of class " +
    +                                                   
nextInputSplit.getClass() + ".", ex));
    +                                           return null;
    +                                   }
    +                           } else {
    +                                   log.error("No InputSplitAssigner for 
vertex ID {}.", vertexID);
    +                                   return null;
    +                           }
    +                   } else {
    +                           log.error("Cannot find execution vertex for 
vertex ID {}.", vertexID);
    +                           return null;
    +                   }
    +           }
    +           return new NextInputSplit(serializedInputSplit);
    +   }
    +
    +   @RpcMethod
    +   public PartitionState requestPartitionState(
    +           final ResultPartitionID partitionId,
    +           final ExecutionAttemptID taskExecutionId,
    +           final IntermediateDataSetID taskResultId)
    +   {
    +           final Execution execution = 
executionGraph.getRegisteredExecutions().get(partitionId.getProducerId());
    +           final ExecutionState state = execution != null ? 
execution.getState() : null;
    +           return new PartitionState(taskExecutionId, taskResultId, 
partitionId.getPartitionId(), state);
    +   }
    +
    +   @RpcMethod
    +   public void jobStatusChanged(final JobStatus newJobStatus, long 
timestamp, final Throwable error) {
    +           final JobID jobID = executionGraph.getJobID();
    +           final String jobName = executionGraph.getJobName();
    +           log.info("Status of job {} ({}) changed to {}.", jobID, 
jobName, newJobStatus, error);
    +
    +           if (newJobStatus.isGloballyTerminalState()) {
    +                   // TODO set job end time in JobInfo
    +
    +                   /*
    +                     TODO
    +                     if (jobInfo.sessionAlive) {
    +                jobInfo.setLastActive()
    +                val lastActivity = jobInfo.lastActive
    +                
context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
    +                  // remove only if no activity occurred in the meantime
    +                  if (lastActivity == jobInfo.lastActive) {
    +                    self ! decorateMessage(RemoveJob(jobID, 
removeJobFromStateBackend = true))
    +                  }
    +                }(context.dispatcher)
    +              } else {
    +                self ! decorateMessage(RemoveJob(jobID, 
removeJobFromStateBackend = true))
    +              }
    +                    */
    +
    +                   if (newJobStatus == JobStatus.FINISHED) {
    +                           try {
    +                                   final Map<String, 
SerializedValue<Object>> accumulatorResults =
    +                                           
executionGraph.getAccumulatorsSerialized();
    +                                   final SerializedJobExecutionResult 
result = new SerializedJobExecutionResult(
    +                                           jobID, 0, accumulatorResults // 
TODO get correct job duration
    +                                   );
    +                                   
jobCompletionActions.jobFinished(result.toJobExecutionResult(userCodeLoader));
    +                           } catch (Exception e) {
    +                                   log.error("Cannot fetch final 
accumulators for job {} ({})", jobName, jobID, e);
    +                                   final JobExecutionException exception = 
new JobExecutionException(
    +                                           jobID, "Failed to retrieve 
accumulator results.", e);
    +                                   // TODO should we also notify client?
    +                                   
jobCompletionActions.jobFailed(exception);
    +                           }
    +                   }
    +                   else if (newJobStatus == JobStatus.CANCELED) {
    +                           final Throwable unpackedError = 
SerializedThrowable.get(error, userCodeLoader);
    --- End diff --
    
    On "Canceled", we probably should not report and lingering exceptions.


> Implement HighAvailabilityServices based on zookeeper
> -----------------------------------------------------
>
>                 Key: FLINK-4657
>                 URL: https://issues.apache.org/jira/browse/FLINK-4657
>             Project: Flink
>          Issue Type: New Feature
>          Components: Cluster Management
>            Reporter: Kurt Young
>            Assignee: Kurt Young
>
> For flip-6, we will have ResourceManager and every JobManager as potential 
> leader contender and retriever. We should separate them by using different 
> zookeeper path. 
> For example, the path could be /leader/resource-manaeger for RM. And for each 
> JM, the path could be /leader/job-managers/JobID



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to