[FLINK-4776] [distributed coordination] Move ExecutionGraph initialization into the dedicated class ExecutionGraphBuilder
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05436f4b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05436f4b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05436f4b Branch: refs/heads/flip-6 Commit: 05436f4b64e771b22f13f56ff9e0ea7aa94b4ff7 Parents: 21b9f16 Author: Stephan Ewen <se...@apache.org> Authored: Fri Oct 7 19:58:24 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Oct 13 16:25:49 2016 +0200 ---------------------------------------------------------------------- .../checkpoint/CheckpointCoordinator.java | 4 +- .../runtime/executiongraph/ExecutionGraph.java | 8 +- .../executiongraph/ExecutionGraphBuilder.java | 262 +++++++++++++++++++ .../flink/runtime/jobmanager/JobManager.scala | 168 ++---------- 4 files changed, 297 insertions(+), 145 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/05436f4b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 4428427..e95afe0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -171,7 +171,7 @@ public class CheckpointCoordinator { CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, SavepointStore savepointStore, - CheckpointStatsTracker statsTracker) throws Exception { + CheckpointStatsTracker statsTracker) { // sanity checks checkArgument(baseInterval > 0, "Checkpoint timeout must be larger than zero"); @@ -207,7 +207,7 @@ public class CheckpointCoordinator { // issues a blocking call to ZooKeeper. checkpointIDCounter.start(); } catch (Throwable t) { - throw new Exception("Failed to start checkpoint ID counter: " + t.getMessage(), t); + throw new RuntimeException("Failed to start checkpoint ID counter: " + t.getMessage(), t); } } http://git-wip-us.apache.org/repos/asf/flink/blob/05436f4b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 6023205..cf98ca6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -359,7 +359,7 @@ public class ExecutionGraph { CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore checkpointStore, SavepointStore savepointStore, - CheckpointStatsTracker statsTracker) throws Exception { + CheckpointStatsTracker statsTracker) { // simple sanity checks if (interval < 10 || checkpointTimeout < 10) { @@ -374,7 +374,11 @@ public class ExecutionGraph { ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo); // disable to make sure existing checkpoint coordinators are cleared - disableSnaphotCheckpointing(); + try { + disableSnaphotCheckpointing(); + } catch (Throwable t) { + LOG.error("Error while shutting down checkpointer."); + } checkpointStatsTracker = Objects.requireNonNull(statsTracker, "Checkpoint stats tracker"); http://git-wip-us.apache.org/repos/asf/flink/blob/05436f4b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java new file mode 100644 index 0000000..1c6eb8d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; +import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; +import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; + +import org.slf4j.Logger; + +import scala.concurrent.ExecutionContext; +import scala.concurrent.ExecutionContext$; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Utility class to encapsulate the logic of building an {@link ExecutionGraph} from a {@link JobGraph}. + */ +public class ExecutionGraphBuilder { + + /** + * Builds the ExecutionGraph from the JobGraph. + * If a prior execution graph exists, the JobGraph will be attached. If no prior execution + * graph exists, then the JobGraph will become attach to a new emoty execution graph. + */ + public static ExecutionGraph buildGraph( + @Nullable ExecutionGraph prior, + JobGraph jobGraph, + Configuration jobManagerConfig, + Executor executor, + ClassLoader classLoader, + CheckpointRecoveryFactory recoveryFactory, + SavepointStore savepointStore, + Time timeout, + RestartStrategy restartStrategy, + MetricGroup metrics, + int parallelismForAutoMax, + Logger log) + throws JobExecutionException, JobException + { + final ExecutionContext executionContext = ExecutionContext$.MODULE$.fromExecutor(executor); + + return buildGraph(prior, jobGraph, jobManagerConfig, executionContext, + classLoader, recoveryFactory, savepointStore, timeout, restartStrategy, + metrics, parallelismForAutoMax, log); + } + + /** + * Builds the ExecutionGraph from the JobGraph. + * If a prior execution graph exists, the JobGraph will be attached. If no prior execution + * graph exists, then the JobGraph will become attach to a new emoty execution graph. + */ + public static ExecutionGraph buildGraph( + @Nullable ExecutionGraph prior, + JobGraph jobGraph, + Configuration jobManagerConfig, + ExecutionContext executionContext, + ClassLoader classLoader, + CheckpointRecoveryFactory recoveryFactory, + SavepointStore savepointStore, + Time timeout, + RestartStrategy restartStrategy, + MetricGroup metrics, + int parallelismForAutoMax, + Logger log) + throws JobExecutionException, JobException + { + checkNotNull(jobGraph, "job graph cannot be null"); + + final String jobName = jobGraph.getName(); + final JobID jobId = jobGraph.getJobID(); + + // create a new execution graph, if none exists so far + final ExecutionGraph executionGraph = (prior != null) ? prior : + new ExecutionGraph( + executionContext, + jobId, + jobName, + jobGraph.getJobConfiguration(), + jobGraph.getSerializedExecutionConfig(), + new FiniteDuration(timeout.getSize(), timeout.getUnit()), + restartStrategy, + jobGraph.getUserJarBlobKeys(), + jobGraph.getClasspaths(), + classLoader, + metrics); + + // set the basic properties + + executionGraph.setScheduleMode(jobGraph.getScheduleMode()); + executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling()); + + try { + executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph)); + } + catch (Throwable t) { + log.warn("Cannot create JSON plan for job", t); + // give the graph an empty plan + executionGraph.setJsonPlan("{}"); + } + + // initialize the vertices that have a master initialization hook + // file output formats create directories here, input formats create splits + + final long initMasterStart = System.nanoTime(); + log.info("Running initialization on master for job {} ({}).", jobName, jobId); + + for (JobVertex vertex : jobGraph.getVertices()) { + String executableClass = vertex.getInvokableClassName(); + if (executableClass == null || executableClass.isEmpty()) { + throw new JobSubmissionException(jobId, + "The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class."); + } + + if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) { + vertex.setParallelism(parallelismForAutoMax); + } + + try { + vertex.initializeOnMaster(classLoader); + } + catch (Throwable t) { + throw new JobExecutionException(jobId, + "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t); + } + } + + log.info("Successfully ran initialization on master in {} ms.", + (System.nanoTime() - initMasterStart) / 1_000_000); + + // topologically sort the job vertices and attach the graph to the existing one + List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources(); + if (log.isDebugEnabled()) { + log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), jobName, jobId); + } + executionGraph.attachJobGraph(sortedTopology); + + if (log.isDebugEnabled()) { + log.debug("Successfully created execution graph from job graph {} ({}).", jobName, jobId); + } + + // configure the state checkpointing + JobSnapshottingSettings snapshotSettings = jobGraph.getSnapshotSettings(); + if (snapshotSettings != null) { + + List<ExecutionJobVertex> triggerVertices = + idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph); + + List<ExecutionJobVertex> ackVertices = + idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph); + + List<ExecutionJobVertex> confirmVertices = + idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph); + + CompletedCheckpointStore completedCheckpoints; + CheckpointIDCounter checkpointIdCounter; + try { + completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, classLoader); + checkpointIdCounter = recoveryFactory.createCheckpointIDCounter(jobId); + } + catch (Exception e) { + throw new JobExecutionException(jobId, "Failed to initialize high-availability checkpoint handler", e); + } + + // Checkpoint stats tracker + boolean isStatsDisabled = jobManagerConfig.getBoolean( + ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE, + ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE); + + CheckpointStatsTracker checkpointStatsTracker; + if (isStatsDisabled) { + checkpointStatsTracker = new DisabledCheckpointStatsTracker(); + } + else { + int historySize = jobManagerConfig.getInteger( + ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE, + ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE); + + checkpointStatsTracker = new SimpleCheckpointStatsTracker(historySize, ackVertices, metrics); + } + + executionGraph.enableSnapshotCheckpointing( + snapshotSettings.getCheckpointInterval(), + snapshotSettings.getCheckpointTimeout(), + snapshotSettings.getMinPauseBetweenCheckpoints(), + snapshotSettings.getMaxConcurrentCheckpoints(), + triggerVertices, + ackVertices, + confirmVertices, + checkpointIdCounter, + completedCheckpoints, + savepointStore, + checkpointStatsTracker); + } + + return executionGraph; + } + + private static List<ExecutionJobVertex> idToVertex( + List<JobVertexID> jobVertices, ExecutionGraph executionGraph) throws IllegalArgumentException { + + List<ExecutionJobVertex> result = new ArrayList<>(jobVertices.size()); + + for (JobVertexID id : jobVertices) { + ExecutionJobVertex vertex = executionGraph.getJobVertex(id); + if (vertex != null) { + result.add(vertex); + } else { + throw new IllegalArgumentException( + "The snapshot checkpointing settings refer to non-existent vertex " + id); + } + } + + return result; + } + + // ------------------------------------------------------------------------ + + /** This class is not supposed to be instantiated */ + private ExecutionGraphBuilder() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/05436f4b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 01f9cec..e90f2d2 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -29,7 +29,8 @@ import akka.actor.Status.{Failure, Success} import akka.actor._ import akka.pattern.ask import grizzled.slf4j.Logger -import org.apache.flink.api.common.{ExecutionConfig, JobID} +import org.apache.flink.api.common.JobID +import org.apache.flink.api.common.time.Time import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} import org.apache.flink.core.fs.FileSystem import org.apache.flink.core.io.InputSplitAssigner @@ -49,11 +50,10 @@ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceMa import org.apache.flink.runtime.clusterframework.types.ResourceID import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory -import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex, StatusListenerMessenger} +import org.apache.flink.runtime.executiongraph.{ExecutionGraphBuilder, ExecutionGraph, ExecutionJobVertex, StatusListenerMessenger} import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager} import org.apache.flink.runtime.io.network.PartitionState -import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator -import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID} +import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus} import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService, StandaloneLeaderElectionService} @@ -1114,7 +1114,7 @@ class JobManager( Option(jobGraph.getSerializedExecutionConfig() .deserializeValue(userCodeLoader) .getRestartStrategy()) - .map(RestartStrategyFactory.createRestartStrategy(_)) match { + .map(RestartStrategyFactory.createRestartStrategy) match { case Some(strategy) => strategy case None => restartStrategyFactory.createRestartStrategy() } @@ -1131,148 +1131,34 @@ class JobManager( new UnregisteredMetricsGroup() } + val numSlots = scheduler.getTotalNumberOfSlots() + // see if there already exists an ExecutionGraph for the corresponding job ID - executionGraph = currentJobs.get(jobGraph.getJobID) match { + val registerNewGraph = currentJobs.get(jobGraph.getJobID) match { case Some((graph, currentJobInfo)) => + executionGraph = graph currentJobInfo.setLastActive() - graph + false case None => - val graph = new ExecutionGraph( - executionContext, - jobGraph.getJobID, - jobGraph.getName, - jobGraph.getJobConfiguration, - jobGraph.getSerializedExecutionConfig, - timeout, - restartStrategy, - jobGraph.getUserJarBlobKeys, - jobGraph.getClasspaths, - userCodeLoader, - jobMetrics) - - currentJobs.put(jobGraph.getJobID, (graph, jobInfo)) - graph - } - - executionGraph.setScheduleMode(jobGraph.getScheduleMode()) - executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling()) - - try { - executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph)) - } - catch { - case t: Throwable => - log.warn("Cannot create JSON plan for job", t) - executionGraph.setJsonPlan("{}") - } - - // initialize the vertices that have a master initialization hook - // file output formats create directories here, input formats create splits - if (log.isDebugEnabled) { - log.debug(s"Running initialization on master for job $jobId ($jobName).") - } - - val numSlots = scheduler.getTotalNumberOfSlots() - - for (vertex <- jobGraph.getVertices.asScala) { - val executableClass = vertex.getInvokableClassName - if (executableClass == null || executableClass.length == 0) { - throw new JobSubmissionException(jobId, - s"The vertex ${vertex.getID} (${vertex.getName}) has no invokable class.") - } - - if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) { - vertex.setParallelism(numSlots) - } - - try { - vertex.initializeOnMaster(userCodeLoader) - } - catch { - case t: Throwable => - throw new JobExecutionException(jobId, - "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage, t) - } + true } - // topologically sort the job vertices and attach the graph to the existing one - val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources() - if (log.isDebugEnabled) { - log.debug(s"Adding ${sortedTopology.size()} vertices from " + - s"job graph $jobId ($jobName).") - } - executionGraph.attachJobGraph(sortedTopology) - - if (log.isDebugEnabled) { - log.debug("Successfully created execution graph from job " + - s"graph $jobId ($jobName).") - } - - // configure the state checkpointing - val snapshotSettings = jobGraph.getSnapshotSettings - if (snapshotSettings != null) { - val jobId = jobGraph.getJobID() - - val idToVertex: JobVertexID => ExecutionJobVertex = id => { - val vertex = executionGraph.getJobVertex(id) - if (vertex == null) { - throw new JobSubmissionException(jobId, - "The snapshot checkpointing settings refer to non-existent vertex " + id) - } - vertex - } - - val triggerVertices: java.util.List[ExecutionJobVertex] = - snapshotSettings.getVerticesToTrigger().asScala.map(idToVertex).asJava - - val ackVertices: java.util.List[ExecutionJobVertex] = - snapshotSettings.getVerticesToAcknowledge().asScala.map(idToVertex).asJava - - val confirmVertices: java.util.List[ExecutionJobVertex] = - snapshotSettings.getVerticesToConfirm().asScala.map(idToVertex).asJava - - val completedCheckpoints = checkpointRecoveryFactory - .createCheckpointStore(jobId, userCodeLoader) - - val checkpointIdCounter = checkpointRecoveryFactory.createCheckpointIDCounter(jobId) - - // Checkpoint stats tracker - val isStatsDisabled: Boolean = flinkConfiguration.getBoolean( - ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE) - - val checkpointStatsTracker : CheckpointStatsTracker = - if (isStatsDisabled) { - new DisabledCheckpointStatsTracker() - } else { - val historySize: Int = flinkConfiguration.getInteger( - ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE, - ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE) - - new SimpleCheckpointStatsTracker(historySize, ackVertices, jobMetrics) - } - - val jobParallelism = jobGraph.getSerializedExecutionConfig - .deserializeValue(userCodeLoader).getParallelism() - - val parallelism = if (jobParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) { - numSlots - } else { - jobParallelism - } - - executionGraph.enableSnapshotCheckpointing( - snapshotSettings.getCheckpointInterval, - snapshotSettings.getCheckpointTimeout, - snapshotSettings.getMinPauseBetweenCheckpoints, - snapshotSettings.getMaxConcurrentCheckpoints, - triggerVertices, - ackVertices, - confirmVertices, - checkpointIdCounter, - completedCheckpoints, - savepointStore, - checkpointStatsTracker) + executionGraph = ExecutionGraphBuilder.buildGraph( + executionGraph, + jobGraph, + flinkConfiguration, + executionContext, + userCodeLoader, + checkpointRecoveryFactory, + savepointStore, + Time.of(timeout.length, timeout.unit), + restartStrategy, + jobMetrics, + numSlots, + log.logger) + + if (registerNewGraph) { + currentJobs.put(jobGraph.getJobID, (executionGraph, jobInfo)) } // get notified about job status changes