[FLINK-4657] Implement HighAvailabilityServices based on ZooKeeper [FLINK-4657] Implement a few rpc calls for JobMaster
[FLINK-4657][cluster management] Address review comments [FLINK-4657][cluster management] Throw exception when error occurred when request input split Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/214113eb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/214113eb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/214113eb Branch: refs/heads/flip-6 Commit: 214113eb494d83046e6b0b2dc4df49fc72d869f8 Parents: 291daf6 Author: Kurt Young <ykt...@gmail.com> Authored: Mon Sep 26 10:59:16 2016 +0800 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Oct 14 15:14:42 2016 +0200 ---------------------------------------------------------------------- .../HighAvailabilityServices.java | 7 +- .../runtime/highavailability/NonHaServices.java | 4 +- .../highavailability/ZookeeperHaServices.java | 82 ++++++++++ .../StandaloneSubmittedJobGraphStore.java | 5 + .../jobmanager/SubmittedJobGraphStore.java | 8 + .../ZooKeeperSubmittedJobGraphStore.java | 7 + .../runtime/jobmaster/JobManagerRunner.java | 18 +-- .../flink/runtime/jobmaster/JobMaster.java | 161 ++++++++++++++++++- .../runtime/jobmaster/JobMasterGateway.java | 54 ++++++- .../jobmaster/message/NextInputSplit.java | 39 +++++ .../resourcemanager/ResourceManager.java | 6 +- .../flink/runtime/util/ZooKeeperUtils.java | 82 ++++++++-- .../flink/runtime/taskmanager/TaskManager.scala | 6 +- .../TestingHighAvailabilityServices.java | 20 +-- .../jobmanager/JobManagerHARecoveryTest.java | 3 +- .../jobmaster/JobManagerRunnerMockTest.java | 11 +- .../slotmanager/SlotProtocolTest.java | 2 +- 17 files changed, 455 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java index d67e927..a26886a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; * <li>JobManager leader election and leader retrieval</li> * <li>Persistence for checkpoint metadata</li> * <li>Registering the latest completed checkpoint(s)</li> + * <li>Persistence for submitted job graph</li> * </ul> */ public interface HighAvailabilityServices { @@ -48,12 +49,10 @@ public interface HighAvailabilityServices { * @return * @throws Exception */ - LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception; + LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception; /** * Gets the leader election service for the cluster's resource manager. - * @return - * @throws Exception */ LeaderElectionService getResourceManagerLeaderElectionService() throws Exception; @@ -62,7 +61,7 @@ public interface HighAvailabilityServices { * * @param jobID The identifier of the job running the election. */ - LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception; + LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception; /** * Gets the checkpoint recovery factory for the job manager http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java index a2c9cc4..2c6295c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java @@ -79,7 +79,7 @@ public class NonHaServices implements HighAvailabilityServices { } @Override - public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception { + public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception { return new StandaloneLeaderRetrievalService(jobMastersAddress.get(jobID), new UUID(0, 0)); } @@ -89,7 +89,7 @@ public class NonHaServices implements HighAvailabilityServices { } @Override - public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception { + public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception { return new StandaloneLeaderElectionService(); } http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java new file mode 100644 index 0000000..d26b668 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointRecoveryFactory; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.util.ZooKeeperUtils; + +/** + * An implementation of the {@link HighAvailabilityServices} with zookeeper. + */ +public class ZookeeperHaServices implements HighAvailabilityServices { + + private static final String DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX = "/resource-manager"; + + /** The ZooKeeper client to use */ + private final CuratorFramework client; + + /** The runtime configuration */ + private final Configuration configuration; + + public ZookeeperHaServices(final CuratorFramework client, final Configuration configuration) { + this.client = client; + this.configuration = configuration; + } + + @Override + public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception { + return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX); + } + + @Override + public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception { + return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathSuffixForJob(jobID)); + } + + @Override + public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { + return ZooKeeperUtils.createLeaderElectionService(client, configuration, DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX); + } + + @Override + public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception { + return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathSuffixForJob(jobID)); + } + + @Override + public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception { + return new ZooKeeperCheckpointRecoveryFactory(client, configuration); + } + + @Override + public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception { + return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration); + } + + private static String getPathSuffixForJob(final JobID jobID) { + return String.format("/job-managers/%s", jobID); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java index 3041cde..00df935 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java @@ -62,4 +62,9 @@ public class StandaloneSubmittedJobGraphStore implements SubmittedJobGraphStore public List<SubmittedJobGraph> recoverJobGraphs() throws Exception { return Collections.emptyList(); } + + @Override + public boolean contains(JobID jobId) throws Exception { + return false; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java index bd628cd..4d544ae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java @@ -64,6 +64,14 @@ public interface SubmittedJobGraphStore { void removeJobGraph(JobID jobId) throws Exception; /** + * Check whether the given {@link JobID} is exist. + * + * <p>It's also a flag indicates whether we should recover this job before we can do anything else, since all + * global terminated job will be removed from this store. + */ + boolean contains(final JobID jobId) throws Exception; + + /** * A listener for {@link SubmittedJobGraph} instances. This is used to react to races between * multiple running {@link SubmittedJobGraphStore} instances (on multiple job managers). */ http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java index ec05f1e..92093c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java @@ -266,6 +266,13 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore { } } + @Override + public boolean contains(JobID jobId) throws Exception { + checkNotNull(jobId, "Job ID"); + String path = getPathForJob(jobId); + return jobGraphsInZooKeeper.exists(path) != -1; + } + /** * Monitors ZooKeeper for changes. * http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index 6944d85..a096932 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -21,20 +21,18 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.rpc.RpcService; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.UUID; -import java.util.concurrent.Executor; /** * The runner for the job manager. It deals with job level leader election and make underlying job manager @@ -52,11 +50,8 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions { private final OnCompletionActions toNotify; - /** The execution context which is used to execute futures */ - private final Executor executionContext; - - // TODO: use this to decide whether the job is finished by other - private final CheckpointRecoveryFactory checkpointRecoveryFactory; + /** Used to check whether a job needs to be run */ + private final SubmittedJobGraphStore submittedJobGraphStore; /** Leader election for this job */ private final LeaderElectionService leaderElectionService; @@ -87,9 +82,8 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions { { this.jobGraph = jobGraph; this.toNotify = toNotify; - this.executionContext = rpcService.getExecutor(); - this.checkpointRecoveryFactory = haServices.getCheckpointRecoveryFactory(); - this.leaderElectionService = haServices.getJobMasterLeaderElectionService(jobGraph.getJobID()); + this.submittedJobGraphStore = haServices.getSubmittedJobGraphStore(); + this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID()); this.jobManager = new JobMaster( jobGraph, configuration, rpcService, haServices, @@ -271,7 +265,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions { @VisibleForTesting boolean isJobFinishedByOthers() { - // TODO + // TODO: Fix return false; } http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 1e01c55..e67a167 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -24,6 +24,8 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; @@ -35,23 +37,32 @@ import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.client.SerializedJobExecutionResult; +import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.Slot; +import org.apache.flink.runtime.io.network.PartitionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; +import org.apache.flink.runtime.jobmaster.message.NextInputSplit; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.registration.RegisteredRpcConnection; import org.apache.flink.runtime.registration.RegistrationResponse; @@ -61,13 +72,18 @@ import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.runtime.util.SerializedThrowable; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; import scala.concurrent.ExecutionContext$; import scala.concurrent.duration.FiniteDuration; +import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.Executor; @@ -491,9 +507,12 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { * @return Acknowledge the task execution state update */ @RpcMethod - public Acknowledge updateTaskExecutionState(TaskExecutionState taskExecutionState) { - System.out.println("TaskExecutionState: " + taskExecutionState); - return Acknowledge.get(); + public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) { + if (taskExecutionState == null) { + return false; + } else { + return executionGraph.updateState(taskExecutionState); + } } //----------------------------------------------------------------------------------------------⨠@@ -511,6 +530,140 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { }); } + @RpcMethod + public NextInputSplit requestNextInputSplit( + final JobVertexID vertexID, + final ExecutionAttemptID executionAttempt) throws Exception + { + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); + if (execution == null) { + // can happen when JobManager had already unregistered this execution upon on task failure, + // but TaskManager get some delay to aware of that situation + if (log.isDebugEnabled()) { + log.debug("Can not find Execution for attempt {}.", executionAttempt); + } + // but we should TaskManager be aware of this + throw new Exception("Can not find Execution for attempt " + executionAttempt); + } + + final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID); + if (vertex == null) { + log.error("Cannot find execution vertex for vertex ID {}.", vertexID); + throw new Exception("Cannot find execution vertex for vertex ID " + vertexID); + } + + final InputSplitAssigner splitAssigner = vertex.getSplitAssigner(); + if (splitAssigner == null) { + log.error("No InputSplitAssigner for vertex ID {}.", vertexID); + throw new Exception("No InputSplitAssigner for vertex ID " + vertexID); + } + + final Slot slot = execution.getAssignedResource(); + final int taskId = execution.getVertex().getParallelSubtaskIndex(); + final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null; + final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId); + + if (log.isDebugEnabled()) { + log.debug("Send next input split {}.", nextInputSplit); + } + + try { + final byte[] serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit); + return new NextInputSplit(serializedInputSplit); + } catch (Exception ex) { + log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex); + IOException reason = new IOException("Could not serialize the next input split of class " + + nextInputSplit.getClass() + ".", ex); + vertex.fail(reason); + throw reason; + } + } + + @RpcMethod + public PartitionState requestPartitionState( + final ResultPartitionID partitionId, + final ExecutionAttemptID taskExecutionId, + final IntermediateDataSetID taskResultId) + { + final Execution execution = executionGraph.getRegisteredExecutions().get(partitionId.getProducerId()); + final ExecutionState state = execution != null ? execution.getState() : null; + return new PartitionState(taskResultId, partitionId.getPartitionId(), state); + } + + @RpcMethod + public void scheduleOrUpdateConsumers(final ResultPartitionID partitionID) { + executionGraph.scheduleOrUpdateConsumers(partitionID); + } + + //---------------------------------------------------------------------------------------------- + // Internal methods + //---------------------------------------------------------------------------------------------- + + // TODO - wrap this as StatusListenerMessenger's callback with rpc main thread + private void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) { + final JobID jobID = executionGraph.getJobID(); + final String jobName = executionGraph.getJobName(); + log.info("Status of job {} ({}) changed to {}.", jobID, jobName, newJobStatus, error); + + if (newJobStatus.isGloballyTerminalState()) { + // TODO set job end time in JobInfo + + /* + TODO + if (jobInfo.sessionAlive) { + jobInfo.setLastActive() + val lastActivity = jobInfo.lastActive + context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) { + // remove only if no activity occurred in the meantime + if (lastActivity == jobInfo.lastActive) { + self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) + } + }(context.dispatcher) + } else { + self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) + } + */ + + if (newJobStatus == JobStatus.FINISHED) { + try { + final Map<String, SerializedValue<Object>> accumulatorResults = + executionGraph.getAccumulatorsSerialized(); + final SerializedJobExecutionResult result = new SerializedJobExecutionResult( + jobID, 0, accumulatorResults // TODO get correct job duration + ); + jobCompletionActions.jobFinished(result.toJobExecutionResult(userCodeLoader)); + } catch (Exception e) { + log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e); + final JobExecutionException exception = new JobExecutionException( + jobID, "Failed to retrieve accumulator results.", e); + // TODO should we also notify client? + jobCompletionActions.jobFailed(exception); + } + } + else if (newJobStatus == JobStatus.CANCELED) { + final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader); + final JobExecutionException exception = new JobExecutionException( + jobID, "Job was cancelled.", unpackedError); + // TODO should we also notify client? + jobCompletionActions.jobFailed(exception); + } + else if (newJobStatus == JobStatus.FAILED) { + final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader); + final JobExecutionException exception = new JobExecutionException( + jobID, "Job execution failed.", unpackedError); + // TODO should we also notify client? + jobCompletionActions.jobFailed(exception); + } + else { + final JobExecutionException exception = new JobExecutionException( + jobID, newJobStatus + " is not a terminal state."); + // TODO should we also notify client? + jobCompletionActions.jobFailed(exception); + throw new RuntimeException(exception); + } + } + } + private void notifyOfNewResourceManagerLeader( final String resourceManagerAddress, final UUID resourceManagerLeaderId) { http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 6587ccb..686a3f3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -19,7 +19,15 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.io.network.PartitionState; +import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.message.NextInputSplit; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.taskmanager.TaskExecutionState; @@ -47,7 +55,47 @@ public interface JobMasterGateway extends RpcGateway { * Updates the task execution state for a given task. * * @param taskExecutionState New task execution state for a given task - * @return Future acknowledge of the task execution state update + * @return Future flag of the task execution state update result */ - Future<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState); + Future<Boolean> updateTaskExecutionState(TaskExecutionState taskExecutionState); + + /** + * Requesting next input split for the {@link ExecutionJobVertex}. The next input split is sent back to the sender + * as a {@link NextInputSplit} message. + * + * @param vertexID The job vertex id + * @param executionAttempt The execution attempt id + * @return The future of the input split. If there is no further input split, will return an empty object. + * @throws Exception if some error occurred or information mismatch. + */ + Future<NextInputSplit> requestNextInputSplit( + final JobVertexID vertexID, + final ExecutionAttemptID executionAttempt) throws Exception; + + /** + * Requests the current state of the partition. + * The state of a partition is currently bound to the state of the producing execution. + * + * @param partitionId The partition ID of the partition to request the state of. + * @param taskExecutionId The execution attempt ID of the task requesting the partition state. + * @param taskResultId The input gate ID of the task requesting the partition state. + * @return The future of the partition state + */ + Future<PartitionState> requestPartitionState( + final ResultPartitionID partitionId, + final ExecutionAttemptID taskExecutionId, + final IntermediateDataSetID taskResultId); + + /** + * Notifies the JobManager about available data for a produced partition. + * <p> + * There is a call to this method for each {@link ExecutionVertex} instance once per produced + * {@link ResultPartition} instance, either when first producing data (for pipelined executions) + * or when all data has been produced (for staged executions). + * <p> + * The JobManager then can decide when to schedule the partition consumers of the given session. + * + * @param partitionID The partition which has already produced data + */ + void scheduleOrUpdateConsumers(final ResultPartitionID partitionID); } http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java new file mode 100644 index 0000000..fe511ed --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.message; + +import java.io.Serializable; + +/** + * Contains the next input split for a task. + */ +public class NextInputSplit implements Serializable { + + private static final long serialVersionUID = -1355784074565856240L; + + private final byte[] splitData; + + public NextInputSplit(final byte[] splitData) { + this.splitData = splitData; + } + + public byte[] getSplitData() { + return splitData; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index f695de4..f45afa3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -129,7 +129,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> try { leaderElectionService.stop(); for (JobID jobID : jobMasterGateways.keySet()) { - highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop(); + highAvailabilityServices.getJobManagerLeaderRetriever(jobID).stop(); } super.shutDown(); } catch (Throwable e) { @@ -179,7 +179,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> final LeaderConnectionInfo jobMasterLeaderInfo; try { jobMasterLeaderInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo( - highAvailabilityServices.getJobMasterLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS)); + highAvailabilityServices.getJobManagerLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS)); } catch (Exception e) { log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID); throw new Exception("Failed to retrieve JobMasterLeaderRetriever"); @@ -203,7 +203,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> if (!jobMasterLeaderRetrievalListeners.containsKey(jobID)) { JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID); try { - LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID); + LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobManagerLeaderRetriever(jobID); jobMasterLeaderRetriever.start(jobMasterLeaderListener); } catch (Exception e) { log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID); http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java index 67fc397..c5d44b2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java @@ -132,13 +132,46 @@ public class ZooKeeperUtils { * @throws Exception */ public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService( - Configuration configuration) throws Exception { - CuratorFramework client = startCuratorFramework(configuration); + final Configuration configuration) throws Exception + { + final CuratorFramework client = startCuratorFramework(configuration); + return createLeaderRetrievalService(client, configuration); + } + + /** + * Creates a {@link ZooKeeperLeaderRetrievalService} instance. + * + * @param client The {@link CuratorFramework} ZooKeeper client to use + * @param configuration {@link Configuration} object containing the configuration values + * @return {@link ZooKeeperLeaderRetrievalService} instance. + * @throws Exception + */ + public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService( + final CuratorFramework client, + final Configuration configuration) throws Exception + { + return createLeaderRetrievalService(client, configuration, ""); + } + + /** + * Creates a {@link ZooKeeperLeaderRetrievalService} instance. + * + * @param client The {@link CuratorFramework} ZooKeeper client to use + * @param configuration {@link Configuration} object containing the configuration values + * @param pathSuffix The path suffix which we want to append + * @return {@link ZooKeeperLeaderRetrievalService} instance. + * @throws Exception + */ + public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService( + final CuratorFramework client, + final Configuration configuration, + final String pathSuffix) throws Exception + { String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys( - configuration, - ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, - ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH, - ConfigConstants.ZOOKEEPER_LEADER_PATH); + configuration, + ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, + ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH, + ConfigConstants.ZOOKEEPER_LEADER_PATH) + pathSuffix; return new ZooKeeperLeaderRetrievalService(client, leaderPath); } @@ -171,16 +204,33 @@ public class ZooKeeperUtils { CuratorFramework client, Configuration configuration) throws Exception { - String latchPath = ConfigurationUtil.getStringWithDeprecatedKeys( - configuration, - ConfigConstants.HA_ZOOKEEPER_LATCH_PATH, - ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH, - ConfigConstants.ZOOKEEPER_LATCH_PATH); - String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys( - configuration, - ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, - ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH, - ConfigConstants.ZOOKEEPER_LEADER_PATH); + return createLeaderElectionService(client, configuration, ""); + } + + /** + * Creates a {@link ZooKeeperLeaderElectionService} instance. + * + * @param client The {@link CuratorFramework} ZooKeeper client to use + * @param configuration {@link Configuration} object containing the configuration values + * @param pathSuffix The path suffix which we want to append + * @return {@link ZooKeeperLeaderElectionService} instance. + * @throws Exception + */ + public static ZooKeeperLeaderElectionService createLeaderElectionService( + final CuratorFramework client, + final Configuration configuration, + final String pathSuffix) throws Exception + { + final String latchPath = ConfigurationUtil.getStringWithDeprecatedKeys( + configuration, + ConfigConstants.HA_ZOOKEEPER_LATCH_PATH, + ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH, + ConfigConstants.ZOOKEEPER_LATCH_PATH) + pathSuffix; + final String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys( + configuration, + ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, + ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH, + ConfigConstants.ZOOKEEPER_LEADER_PATH) + pathSuffix; return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath); } http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index d16c1b0..7a764ca 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -52,6 +52,8 @@ import org.apache.flink.runtime.filecache.FileCache import org.apache.flink.runtime.instance.{AkkaActorGateway, HardwareDescription, InstanceID} import org.apache.flink.runtime.io.disk.iomanager.IOManager import org.apache.flink.runtime.io.network.NetworkEnvironment +import org.apache.flink.runtime.io.network.netty.PartitionStateChecker +import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier import org.apache.flink.runtime.jobgraph.IntermediateDataSetID import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService} import org.apache.flink.runtime.memory.MemoryManager @@ -944,12 +946,12 @@ class TaskManager( val partitionStateChecker = new ActorGatewayPartitionStateChecker( jobManagerGateway, - config.timeout) + new FiniteDuration(config.getTimeout().toMilliseconds, TimeUnit.MILLISECONDS)) val resultPartitionConsumableNotifier = new ActorGatewayResultPartitionConsumableNotifier( context.dispatcher, jobManagerGateway, - config.timeout) + new FiniteDuration(config.getTimeout().toMilliseconds, TimeUnit.MILLISECONDS)) connectionUtils = Some( (checkpointResponder, http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java index 1a5450d..faf69cc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java @@ -36,7 +36,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices private ConcurrentHashMap<JobID, LeaderRetrievalService> jobMasterLeaderRetrievers = new ConcurrentHashMap<>(); - private volatile LeaderElectionService jobMasterLeaderElectionService; + private ConcurrentHashMap<JobID, LeaderElectionService> jobManagerLeaderElectionServices = new ConcurrentHashMap<>(); private volatile LeaderElectionService resourceManagerLeaderElectionService; @@ -56,8 +56,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices this.jobMasterLeaderRetrievers.put(jobID, jobMasterLeaderRetriever); } - public void setJobMasterLeaderElectionService(LeaderElectionService leaderElectionService) { - this.jobMasterLeaderElectionService = leaderElectionService; + public void setJobMasterLeaderElectionService(JobID jobID, LeaderElectionService leaderElectionService) { + this.jobManagerLeaderElectionServices.put(jobID, leaderElectionService); } public void setResourceManagerLeaderElectionService(LeaderElectionService leaderElectionService) { @@ -87,7 +87,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices } @Override - public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception { + public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception { LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID); if (service != null) { return service; @@ -97,24 +97,24 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices } @Override - public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception { - LeaderElectionService service = jobMasterLeaderElectionService; + public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { + LeaderElectionService service = resourceManagerLeaderElectionService; if (service != null) { return service; } else { - throw new IllegalStateException("JobMasterLeaderElectionService has not been set"); + throw new IllegalStateException("ResourceManagerLeaderElectionService has not been set"); } } @Override - public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { - LeaderElectionService service = resourceManagerLeaderElectionService; + public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception { + LeaderElectionService service = this.jobManagerLeaderElectionServices.get(jobID); if (service != null) { return service; } else { - throw new IllegalStateException("ResourceManagerLeaderElectionService has not been set"); + throw new IllegalStateException("JobMasterLeaderElectionService has not been set"); } } http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index 5ec6991..8419abe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -406,7 +406,8 @@ public class JobManagerHARecoveryTest { storedJobs.remove(jobId); } - boolean contains(JobID jobId) { + @Override + public boolean contains(JobID jobId) { return storedJobs.containsKey(jobId); } } http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java index bfe5f55..3a769bb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.rpc.RpcService; import org.junit.After; @@ -57,6 +58,8 @@ public class JobManagerRunnerMockTest { private LeaderElectionService leaderElectionService; + private SubmittedJobGraphStore submittedJobGraphStore; + private TestingOnCompletionActions jobCompletion; @Before @@ -72,8 +75,12 @@ public class JobManagerRunnerMockTest { leaderElectionService = mock(LeaderElectionService.class); when(leaderElectionService.hasLeadership()).thenReturn(true); + submittedJobGraphStore = mock(SubmittedJobGraphStore.class); + when(submittedJobGraphStore.contains(any(JobID.class))).thenReturn(true); + HighAvailabilityServices haServices = mock(HighAvailabilityServices.class); - when(haServices.getJobMasterLeaderElectionService(any(JobID.class))).thenReturn(leaderElectionService); + when(haServices.getJobManagerLeaderElectionService(any(JobID.class))).thenReturn(leaderElectionService); + when(haServices.getSubmittedJobGraphStore()).thenReturn(submittedJobGraphStore); runner = PowerMockito.spy(new JobManagerRunner( new JobGraph("test"), @@ -127,7 +134,7 @@ public class JobManagerRunnerMockTest { public void testJobFinishedByOtherBeforeGrantLeadership() throws Exception { runner.start(); - when(runner.isJobFinishedByOthers()).thenReturn(true); + when(submittedJobGraphStore.contains(any(JobID.class))).thenReturn(false); runner.grantLeadership(UUID.randomUUID()); // runner should shutdown automatic and informed the job completion http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index e3018c9..805ea71 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -237,7 +237,7 @@ public class SlotProtocolTest extends TestLogger { testingHA.setResourceManagerLeaderRetriever(rmLeaderRetrievalService); final TestingLeaderElectionService jmLeaderElectionService = new TestingLeaderElectionService(); - testingHA.setJobMasterLeaderElectionService(jmLeaderElectionService); + testingHA.setJobMasterLeaderElectionService(jobID, jmLeaderElectionService); final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService(jmAddress, jmID); testingHA.setJobMasterLeaderRetriever(jobID, jmLeaderRetrievalService);