[ https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15526749#comment-15526749 ]
ASF GitHub Bot commented on FLINK-4657: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2550#discussion_r80740727 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) { //TODO:: register at the RM } + @RpcMethod + public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) { + final byte[] serializedInputSplit; + + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); + if (execution == null) { + log.error("Can not find Execution for attempt {}.", executionAttempt); + return null; + } else { + final Slot slot = execution.getAssignedResource(); + final int taskId = execution.getVertex().getParallelSubtaskIndex(); + final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null; + + final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID); + if (vertex != null) { + final InputSplitAssigner splitAssigner = vertex.getSplitAssigner(); + if (splitAssigner != null) { + final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId); + + log.debug("Send next input split {}.", nextInputSplit); + try { + serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit); + } catch (Exception ex) { + log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex); + vertex.fail(new RuntimeException("Could not serialize the next input split of class " + + nextInputSplit.getClass() + ".", ex)); + return null; + } + } else { + log.error("No InputSplitAssigner for vertex ID {}.", vertexID); + return null; + } + } else { + log.error("Cannot find execution vertex for vertex ID {}.", vertexID); + return null; + } + } + return new NextInputSplit(serializedInputSplit); + } + + @RpcMethod + public PartitionState requestPartitionState( + final ResultPartitionID partitionId, + final ExecutionAttemptID taskExecutionId, + final IntermediateDataSetID taskResultId) + { + final Execution execution = executionGraph.getRegisteredExecutions().get(partitionId.getProducerId()); + final ExecutionState state = execution != null ? execution.getState() : null; + return new PartitionState(taskExecutionId, taskResultId, partitionId.getPartitionId(), state); + } + + @RpcMethod + public void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) { + final JobID jobID = executionGraph.getJobID(); + final String jobName = executionGraph.getJobName(); + log.info("Status of job {} ({}) changed to {}.", jobID, jobName, newJobStatus, error); + + if (newJobStatus.isGloballyTerminalState()) { + // TODO set job end time in JobInfo + + /* + TODO + if (jobInfo.sessionAlive) { + jobInfo.setLastActive() + val lastActivity = jobInfo.lastActive + context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) { + // remove only if no activity occurred in the meantime + if (lastActivity == jobInfo.lastActive) { + self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) + } + }(context.dispatcher) + } else { + self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) + } + */ + + if (newJobStatus == JobStatus.FINISHED) { + try { + final Map<String, SerializedValue<Object>> accumulatorResults = + executionGraph.getAccumulatorsSerialized(); + final SerializedJobExecutionResult result = new SerializedJobExecutionResult( + jobID, 0, accumulatorResults // TODO get correct job duration + ); + jobCompletionActions.jobFinished(result.toJobExecutionResult(userCodeLoader)); + } catch (Exception e) { + log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e); + final JobExecutionException exception = new JobExecutionException( + jobID, "Failed to retrieve accumulator results.", e); + // TODO should we also notify client? + jobCompletionActions.jobFailed(exception); + } + } + else if (newJobStatus == JobStatus.CANCELED) { + final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader); --- End diff -- On "Canceled", we probably should not report and lingering exceptions. > Implement HighAvailabilityServices based on zookeeper > ----------------------------------------------------- > > Key: FLINK-4657 > URL: https://issues.apache.org/jira/browse/FLINK-4657 > Project: Flink > Issue Type: New Feature > Components: Cluster Management > Reporter: Kurt Young > Assignee: Kurt Young > > For flip-6, we will have ResourceManager and every JobManager as potential > leader contender and retriever. We should separate them by using different > zookeeper path. > For example, the path could be /leader/resource-manaeger for RM. And for each > JM, the path could be /leader/job-managers/JobID -- This message was sent by Atlassian JIRA (v6.3.4#6332)