[FLINK-4375] [distributed coordination] Implement new JobManager creation, initialization, and basic RPC methods
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/34fef475 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/34fef475 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/34fef475 Branch: refs/heads/flip-6 Commit: 34fef4752e94b3d0c7afe7a9525799bb651a07b4 Parents: c8dc074 Author: Kurt Young <ykt...@gmail.com> Authored: Wed Oct 12 23:25:16 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Oct 13 16:25:49 2016 +0200 ---------------------------------------------------------------------- .../org/apache/flink/util/ExceptionUtils.java | 20 + .../java/org/apache/flink/util/StringUtils.java | 14 + .../apache/flink/runtime/blob/BlobServer.java | 65 +- .../apache/flink/runtime/blob/BlobStore.java | 3 +- .../apache/flink/runtime/blob/BlobUtils.java | 8 +- .../flink/runtime/blob/FileSystemBlobStore.java | 53 +- .../flink/runtime/blob/VoidBlobStore.java | 2 +- .../CheckpointCoordinatorGateway.java | 26 +- .../HighAvailabilityServices.java | 13 + .../runtime/highavailability/NonHaServices.java | 13 + .../highavailability/RunningJobsRegistry.java | 66 ++ .../highavailability/ZookeeperHaServices.java | 115 ++- .../highavailability/nonha/NonHaRegistry.java | 62 ++ .../runtime/jobmanager/OnCompletionActions.java | 3 +- .../runtime/jobmaster/JobManagerRunner.java | 270 ++++-- .../runtime/jobmaster/JobManagerServices.java | 93 +- .../flink/runtime/jobmaster/JobMaster.java | 902 +++++++------------ .../runtime/jobmaster/JobMasterGateway.java | 114 +-- .../jobmaster/MiniClusterJobDispatcher.java | 61 +- .../message/DisposeSavepointResponse.java | 49 - .../message/TriggerSavepointResponse.java | 74 -- .../apache/flink/runtime/rpc/RpcService.java | 4 +- .../taskexecutor/JobManagerConnection.java | 25 +- .../runtime/taskexecutor/TaskExecutor.java | 107 ++- .../taskexecutor/rpc/RpcInputSplitProvider.java | 8 +- .../rpc/RpcPartitionStateChecker.java | 8 +- .../RpcResultPartitionConsumableNotifier.java | 7 +- .../apache/flink/runtime/akka/AkkaUtils.scala | 6 + .../TestingHighAvailabilityServices.java | 14 + .../jobmaster/JobManagerRunnerMockTest.java | 58 +- .../runtime/jobmaster/JobManagerRunnerTest.java | 24 + 31 files changed, 1278 insertions(+), 1009 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java index 7227006..0f6f24f 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java @@ -99,6 +99,26 @@ public final class ExceptionUtils { } /** + * Throws the given {@code Throwable} in scenarios where the signatures do allow to + * throw a Exception. Errors and Exceptions are thrown directly, other "exotic" + * subclasses of Throwable are wrapped in an Exception. + * + * @param t The throwable to be thrown. + * @param parentMessage The message for the parent Exception, if one is needed. + */ + public static void rethrowException(Throwable t, String parentMessage) throws Exception { + if (t instanceof Error) { + throw (Error) t; + } + else if (t instanceof Exception) { + throw (Exception) t; + } + else { + throw new Exception(parentMessage, t); + } + } + + /** * Tries to throw the given {@code Throwable} in scenarios where the signatures allows only IOExceptions * (and RuntimeException and Error). Throws this exception directly, if it is an IOException, * a RuntimeException, or an Error. Otherwise does nothing. http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-core/src/main/java/org/apache/flink/util/StringUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java index 10b6304..3c32d77 100644 --- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java @@ -335,4 +335,18 @@ public final class StringUtils { return null; } } + + public static boolean isNullOrWhitespaceOnly(String str) { + if (str == null || str.length() == 0) { + return true; + } + + final int len = str.length(); + for (int i = 0; i < len; i++) { + if (!Character.isWhitespace(str.charAt(i))) { + return false; + } + } + return true; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index ff54b67..5395d34 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -22,7 +22,11 @@ import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.util.NetUtils; import org.slf4j.Logger; @@ -43,6 +47,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; /** * This class implements the BLOB server. The BLOB server is responsible for listening for incoming requests and @@ -88,23 +93,22 @@ public class BlobServer extends Thread implements BlobService { * thrown if the BLOB server cannot bind to a free network port */ public BlobServer(Configuration config) throws IOException { - checkNotNull(config, "Configuration"); + this(config, createBlobStoreFromConfig(config)); + } - HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config); + public BlobServer(Configuration config, HighAvailabilityServices haServices) throws IOException { + this(config, haServices.createBlobStore()); + } + + private BlobServer(Configuration config, BlobStore blobStore) throws IOException { + checkNotNull(config); + this.blobStore = checkNotNull(blobStore); // configure and create the storage directory String storageDirectory = config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null); this.storageDir = BlobUtils.initStorageDirectory(storageDirectory); LOG.info("Created BLOB server storage directory {}", storageDir); - if (highAvailabilityMode == HighAvailabilityMode.NONE) { - this.blobStore = new VoidBlobStore(); - } else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) { - this.blobStore = new FileSystemBlobStore(config); - } else { - throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + "."); - } - // configure the maximum number of concurrent connections final int maxConnections = config.getInteger( ConfigConstants.BLOB_FETCH_CONCURRENT_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT); @@ -125,13 +129,7 @@ public class BlobServer extends Thread implements BlobService { backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG; } - if (highAvailabilityMode == HighAvailabilityMode.NONE) { - // Add shutdown hook to delete storage directory - this.shutdownHook = BlobUtils.addShutdownHook(this, LOG); - } - else { - this.shutdownHook = null; - } + this.shutdownHook = BlobUtils.addShutdownHook(this, LOG); // ----------------------- start the server ------------------- @@ -426,4 +424,37 @@ public class BlobServer extends Thread implements BlobService { } } + private static BlobStore createBlobStoreFromConfig(Configuration config) throws IOException { + HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config); + + if (highAvailabilityMode == HighAvailabilityMode.NONE) { + return new VoidBlobStore(); + } else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) { + final String storagePath = config.getValue(HighAvailabilityOptions.HA_STORAGE_PATH); + if (isNullOrWhitespaceOnly(storagePath)) { + throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " + + HighAvailabilityOptions.HA_STORAGE_PATH); + } + + final Path path; + try { + path = new Path(storagePath); + } catch (Exception e) { + throw new IOException("Invalid path for highly available storage (" + + HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e); + } + + final FileSystem fileSystem; + try { + fileSystem = path.getFileSystem(); + } catch (Exception e) { + throw new IOException("Could not create FileSystem for highly available storage (" + + HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e); + } + + return new FileSystemBlobStore(fileSystem, storagePath); + } else { + throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + "."); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java index 1e72d91..7050338 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java @@ -25,7 +25,7 @@ import java.io.File; /** * A blob store. */ -interface BlobStore { +public interface BlobStore { /** * Copies the local file to the blob store. @@ -93,5 +93,4 @@ interface BlobStore { * Cleans up the store and deletes all blobs. */ void cleanUp(); - } http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java index e74fa6f..136df09 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.util.IOUtils; +import org.apache.flink.util.StringUtils; import org.slf4j.Logger; import java.io.EOFException; @@ -73,7 +74,7 @@ public class BlobUtils { */ static File initStorageDirectory(String storageDirectory) { File baseDir; - if (storageDirectory == null || storageDirectory.trim().isEmpty()) { + if (StringUtils.isNullOrWhitespaceOnly(storageDirectory)) { baseDir = new File(System.getProperty("java.io.tmpdir")); } else { @@ -81,10 +82,9 @@ public class BlobUtils { } File storageDir; - final int MAX_ATTEMPTS = 10; - int attempt; - for(attempt = 0; attempt < MAX_ATTEMPTS; attempt++) { + final int MAX_ATTEMPTS = 10; + for(int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) { storageDir = new File(baseDir, String.format( "blobStore-%s", UUID.randomUUID().toString())); http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java index deba738..2c05002 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java @@ -20,12 +20,7 @@ package org.apache.flink.runtime.blob; import com.google.common.io.Files; -import org.apache.commons.lang3.StringUtils; - import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.HighAvailabilityOptions; -import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.util.IOUtils; @@ -38,7 +33,6 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.net.URI; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -47,25 +41,24 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * * <p>This is used in addition to the local blob storage for high availability. */ -class FileSystemBlobStore implements BlobStore { +public class FileSystemBlobStore implements BlobStore { private static final Logger LOG = LoggerFactory.getLogger(FileSystemBlobStore.class); + /** The file system in which blobs are stored */ + private final FileSystem fileSystem; + /** The base path of the blob store */ private final String basePath; - FileSystemBlobStore(Configuration config) throws IOException { - String storagePath = config.getValue(HighAvailabilityOptions.HA_STORAGE_PATH); - - if (storagePath == null || StringUtils.isBlank(storagePath)) { - throw new IllegalConfigurationException("Missing high-availability storage path for metadata." + - " Specify via configuration key '" + HighAvailabilityOptions.HA_STORAGE_PATH + "'."); - } + public FileSystemBlobStore(FileSystem fileSystem, String storagePath) throws IOException { + this.fileSystem = checkNotNull(fileSystem); + this.basePath = checkNotNull(storagePath) + "/blob"; - this.basePath = storagePath + "/blob"; + LOG.info("Creating highly available BLOB storage directory at {}", basePath); - FileSystem.get(new Path(basePath).toUri()).mkdirs(new Path(basePath)); - LOG.info("Created blob directory {}.", basePath); + fileSystem.mkdirs(new Path(basePath)); + LOG.debug("Created highly available BLOB storage directory at {}", basePath); } // - Put ------------------------------------------------------------------ @@ -81,9 +74,7 @@ class FileSystemBlobStore implements BlobStore { } private void put(File fromFile, String toBlobPath) throws Exception { - try (OutputStream os = FileSystem.get(new URI(toBlobPath)) - .create(new Path(toBlobPath), true)) { - + try (OutputStream os = fileSystem.create(new Path(toBlobPath), true)) { LOG.debug("Copying from {} to {}.", fromFile, toBlobPath); Files.copy(fromFile, os); } @@ -106,16 +97,15 @@ class FileSystemBlobStore implements BlobStore { checkNotNull(toFile, "File"); if (!toFile.exists() && !toFile.createNewFile()) { - throw new IllegalStateException("Failed to create target file to copy to"); + throw new IOException("Failed to create target file to copy to"); } - final URI fromUri = new URI(fromBlobPath); final Path fromPath = new Path(fromBlobPath); - if (FileSystem.get(fromUri).exists(fromPath)) { - try (InputStream is = FileSystem.get(fromUri).open(fromPath)) { - FileOutputStream fos = new FileOutputStream(toFile); - + if (fileSystem.exists(fromPath)) { + try (InputStream is = fileSystem.open(fromPath); + FileOutputStream fos = new FileOutputStream(toFile)) + { LOG.debug("Copying from {} to {}.", fromBlobPath, toFile); IOUtils.copyBytes(is, fos); // closes the streams } @@ -145,17 +135,16 @@ class FileSystemBlobStore implements BlobStore { private void delete(String blobPath) { try { LOG.debug("Deleting {}.", blobPath); - - FileSystem fs = FileSystem.get(new URI(blobPath)); + Path path = new Path(blobPath); - fs.delete(path, true); + fileSystem.delete(path, true); // send a call to delete the directory containing the file. This will // fail (and be ignored) when some files still exist. try { - fs.delete(path.getParent(), false); - fs.delete(new Path(basePath), false); + fileSystem.delete(path.getParent(), false); + fileSystem.delete(new Path(basePath), false); } catch (IOException ignored) {} } catch (Exception e) { @@ -168,7 +157,7 @@ class FileSystemBlobStore implements BlobStore { try { LOG.debug("Cleaning up {}.", basePath); - FileSystem.get(new URI(basePath)).delete(new Path(basePath), true); + fileSystem.delete(new Path(basePath), true); } catch (Exception e) { LOG.error("Failed to clean up recovery directory."); http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java index 1b71add..ece2ac1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java @@ -25,7 +25,7 @@ import java.io.File; /** * A blob store doing nothing. */ -class VoidBlobStore implements BlobStore { +public class VoidBlobStore implements BlobStore { @Override public void put(File localFile, BlobKey blobKey) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java index e448ebc..196ef5c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java @@ -23,21 +23,23 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.state.CheckpointStateHandles; +import java.util.UUID; + public interface CheckpointCoordinatorGateway extends RpcGateway { void acknowledgeCheckpoint( - JobID jobID, - ExecutionAttemptID executionAttemptID, - long checkpointID, - CheckpointStateHandles checkpointStateHandles, - long synchronousDurationMillis, - long asynchronousDurationMillis, - long bytesBufferedInAlignment, - long alignmentDurationNanos); + final JobID jobID, + final ExecutionAttemptID executionAttemptID, + final long checkpointID, + final CheckpointStateHandles checkpointStateHandles, + final long synchronousDurationMillis, + final long asynchronousDurationMillis, + final long bytesBufferedInAlignment, + final long alignmentDurationNanos); void declineCheckpoint( - JobID jobID, - ExecutionAttemptID executionAttemptID, - long checkpointID, - long checkpointTimestamp); + final JobID jobID, + final ExecutionAttemptID executionAttemptID, + final long checkpointID, + final long checkpointTimestamp); } http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java index a26886a..5d78ffc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java @@ -19,11 +19,14 @@ package org.apache.flink.runtime.highavailability; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.blob.BlobStore; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import java.io.IOException; + /** * This class gives access to all services needed for * @@ -72,4 +75,14 @@ public interface HighAvailabilityServices { * Gets the submitted job graph store for the job manager */ SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception; + + /** + * Gets the registry that holds information about whether jobs are currently running. + */ + RunningJobsRegistry getRunningJobsRegistry() throws Exception; + + /** + * Creates the BLOB store in which BLOBs are stored in a highly-available fashion. + */ + BlobStore createBlobStore() throws IOException; } http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java index 2c6295c..d7fd2bf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java @@ -19,8 +19,11 @@ package org.apache.flink.runtime.highavailability; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.blob.BlobStore; +import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.highavailability.nonha.NonHaRegistry; import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.leaderelection.LeaderElectionService; @@ -102,4 +105,14 @@ public class NonHaServices implements HighAvailabilityServices { public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception { return new StandaloneSubmittedJobGraphStore(); } + + @Override + public RunningJobsRegistry getRunningJobsRegistry() throws Exception { + return new NonHaRegistry(); + } + + @Override + public BlobStore createBlobStore() { + return new VoidBlobStore(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java new file mode 100644 index 0000000..e7c131c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.api.common.JobID; + +import java.io.IOException; + +/** + * This registry tracks if a certain job is running. + * + * <p>This registry is used in highly-available setups with multiple master nodes, + * to determine whether a new leader should attempt to recover a certain job (because the + * job is still running), or whether the job has already finished successfully (in case of a + * finite job) and the leader has only been granted leadership because the previous leader + * quit cleanly after the job was finished. + */ +public interface RunningJobsRegistry { + + /** + * Marks a job as running. + * + * @param jobID The id of the job. + * + * @throws IOException Thrown when the communication with the highly-available storage or registry + * failed and could not be retried. + */ + void setJobRunning(JobID jobID) throws IOException; + + /** + * Marks a job as running. + * + * @param jobID The id of the job. + * + * @throws IOException Thrown when the communication with the highly-available storage or registry + * failed and could not be retried. + */ + void setJobFinished(JobID jobID) throws IOException; + + /** + * Checks whether a job is running. + * + * @param jobID The id of the job to check. + * @return True if the job is still running, false otherwise. + * + * @throws IOException Thrown when the communication with the highly-available storage or registry + * failed and could not be retried. + */ + boolean isJobRunning(JobID jobID) throws IOException; +} http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java index d26b668..3a7736b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java @@ -19,8 +19,15 @@ package org.apache.flink.runtime.highavailability; import org.apache.curator.framework.CuratorFramework; + import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.BlobStore; +import org.apache.flink.runtime.blob.FileSystemBlobStore; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointRecoveryFactory; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; @@ -28,12 +35,56 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.util.ZooKeeperUtils; +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; + /** - * An implementation of the {@link HighAvailabilityServices} with zookeeper. + * An implementation of the {@link HighAvailabilityServices} using Apache ZooKeeper. + * The services store data in ZooKeeper's nodes as illustrated by teh following tree structure: + * + * <pre> + * /flink + * +/cluster_id_1/resource_manager_lock + * | | + * | +/job-id-1/job_manager_lock + * | | /checkpoints/latest + * | | /latest-1 + * | | /latest-2 + * | | + * | +/job-id-2/job_manager_lock + * | + * +/cluster_id_2/resource_manager_lock + * | + * +/job-id-1/job_manager_lock + * |/checkpoints/latest + * | /latest-1 + * |/persisted_job_graph + * </pre> + * + * <p>The root path "/flink" is configurable via the option {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}. + * This makes sure Flink stores its data under specific subtrees in ZooKeeper, for example to + * accommodate specific permission. + * + * <p>The "cluster_id" part identifies the data stored for a specific Flink "cluster". + * This "cluster" can be either a standalone or containerized Flink cluster, or it can be job + * on a framework like YARN or Mesos (in a "per-job-cluster" mode). + * + * <p>In case of a "per-job-cluster" on YARN or Mesos, the cluster-id is generated and configured + * automatically by the client or dispatcher that submits the Job to YARN or Mesos. + * + * <p>In the case of a standalone cluster, that cluster-id needs to be configured via + * {@link HighAvailabilityOptions#HA_CLUSTER_ID}. All nodes with the same cluster id will join the same + * cluster and participate in the execution of the same set of jobs. */ public class ZookeeperHaServices implements HighAvailabilityServices { - private static final String DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX = "/resource-manager"; + private static final String RESOURCE_MANAGER_LEADER_PATH = "/resource_manager_lock"; + + private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock"; + + // ------------------------------------------------------------------------ /** The ZooKeeper client to use */ private final CuratorFramework client; @@ -41,29 +92,37 @@ public class ZookeeperHaServices implements HighAvailabilityServices { /** The runtime configuration */ private final Configuration configuration; + public ZookeeperHaServices(Configuration configuration) { + this(ZooKeeperUtils.startCuratorFramework(configuration), configuration); + } + public ZookeeperHaServices(final CuratorFramework client, final Configuration configuration) { - this.client = client; - this.configuration = configuration; + this.client = checkNotNull(client); + this.configuration = checkNotNull(configuration); } + // ------------------------------------------------------------------------ + // Services + // ------------------------------------------------------------------------ + @Override public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception { - return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX); + return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH); } @Override public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception { - return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathSuffixForJob(jobID)); + return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID)); } @Override public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { - return ZooKeeperUtils.createLeaderElectionService(client, configuration, DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX); + return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH); } @Override public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception { - return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathSuffixForJob(jobID)); + return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID)); } @Override @@ -76,7 +135,43 @@ public class ZookeeperHaServices implements HighAvailabilityServices { return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration); } - private static String getPathSuffixForJob(final JobID jobID) { - return String.format("/job-managers/%s", jobID); + @Override + public RunningJobsRegistry getRunningJobsRegistry() throws Exception { + throw new UnsupportedOperationException("not yet implemented"); + } + + @Override + public BlobStore createBlobStore() throws IOException { + final String storagePath = configuration.getValue(HighAvailabilityOptions.HA_STORAGE_PATH); + if (isNullOrWhitespaceOnly(storagePath)) { + throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " + + HighAvailabilityOptions.HA_STORAGE_PATH); + } + + final Path path; + try { + path = new Path(storagePath); + } catch (Exception e) { + throw new IOException("Invalid path for highly available storage (" + + HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e); + } + + final FileSystem fileSystem; + try { + fileSystem = path.getFileSystem(); + } catch (Exception e) { + throw new IOException("Could not create FileSystem for highly available storage (" + + HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e); + } + + return new FileSystemBlobStore(fileSystem, storagePath); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + private static String getPathForJobManager(final JobID jobID) { + return "/" + jobID + JOB_MANAGER_LEADER_PATH; } } http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java new file mode 100644 index 0000000..85dd711 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability.nonha; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.highavailability.RunningJobsRegistry; + +import java.util.HashSet; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A registry for running jobs, not-highly available. + */ +public class NonHaRegistry implements RunningJobsRegistry { + + /** The currently running jobs */ + private final HashSet<JobID> running = new HashSet<>(); + + @Override + public void setJobRunning(JobID jobID) { + checkNotNull(jobID); + + synchronized (running) { + running.add(jobID); + } + } + + @Override + public void setJobFinished(JobID jobID) { + checkNotNull(jobID); + + synchronized (running) { + running.remove(jobID); + } + } + + @Override + public boolean isJobRunning(JobID jobID) { + checkNotNull(jobID); + + synchronized (running) { + return running.contains(jobID); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java index 6de4253..25a2a66 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java @@ -19,9 +19,8 @@ package org.apache.flink.runtime.jobmanager; import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.runtime.rpc.FatalErrorHandler; -public interface OnCompletionActions extends FatalErrorHandler { +public interface OnCompletionActions { void jobFinished(JobExecutionResult result); http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index a096932..74c1050 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -21,26 +21,38 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.RunningJobsRegistry; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; -import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; -import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.UUID; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * The runner for the job manager. It deals with job level leader election and make underlying job manager * properly reacted. */ -public class JobManagerRunner implements LeaderContender, OnCompletionActions { +public class JobManagerRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler { - private final Logger log = LoggerFactory.getLogger(JobManagerRunner.class); + private static final Logger log = LoggerFactory.getLogger(JobManagerRunner.class); + + // ------------------------------------------------------------------------ /** Lock to ensure that this runner can deal with leader election event and job completion notifies simultaneously */ private final Object lock = new Object(); @@ -48,52 +60,141 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions { /** The job graph needs to run */ private final JobGraph jobGraph; - private final OnCompletionActions toNotify; + /** The listener to notify once the job completes - either successfully or unsuccessfully */ + private final OnCompletionActions toNotifyOnComplete; + + /** The handler to call in case of fatal (unrecoverable) errors */ + private final FatalErrorHandler errorHandler; /** Used to check whether a job needs to be run */ - private final SubmittedJobGraphStore submittedJobGraphStore; + private final RunningJobsRegistry runningJobsRegistry; /** Leader election for this job */ private final LeaderElectionService leaderElectionService; + private final JobManagerServices jobManagerServices; + private final JobMaster jobManager; + private final JobManagerMetricGroup jobManagerMetricGroup; + /** flag marking the runner as shut down */ private volatile boolean shutdown; + // ------------------------------------------------------------------------ + public JobManagerRunner( - final JobGraph jobGraph, - final Configuration configuration, - final RpcService rpcService, - final HighAvailabilityServices haServices, - final OnCompletionActions toNotify) throws Exception + final JobGraph jobGraph, + final Configuration configuration, + final RpcService rpcService, + final HighAvailabilityServices haServices, + final OnCompletionActions toNotifyOnComplete, + final FatalErrorHandler errorHandler) throws Exception { this(jobGraph, configuration, rpcService, haServices, - JobManagerServices.fromConfiguration(configuration), toNotify); + new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)), + toNotifyOnComplete, errorHandler); } public JobManagerRunner( - final JobGraph jobGraph, - final Configuration configuration, - final RpcService rpcService, - final HighAvailabilityServices haServices, - final JobManagerServices jobManagerServices, - final OnCompletionActions toNotify) throws Exception + final JobGraph jobGraph, + final Configuration configuration, + final RpcService rpcService, + final HighAvailabilityServices haServices, + final MetricRegistry metricRegistry, + final OnCompletionActions toNotifyOnComplete, + final FatalErrorHandler errorHandler) throws Exception + { + this(jobGraph, configuration, rpcService, haServices, + JobManagerServices.fromConfiguration(configuration, haServices), + metricRegistry, + toNotifyOnComplete, errorHandler); + } + + /** + * + * <p>Exceptions that occur while creating the JobManager or JobManagerRunner are directly + * thrown and not reported to the given {@code FatalErrorHandler}. + * + * <p>This JobManagerRunner assumes that it owns the given {@code JobManagerServices}. + * It will shut them down on error and on calls to {@link #shutdown()}. + * + * @throws Exception Thrown if the runner cannot be set up, because either one of the + * required services could not be started, ot the Job could not be initialized. + */ + public JobManagerRunner( + final JobGraph jobGraph, + final Configuration configuration, + final RpcService rpcService, + final HighAvailabilityServices haServices, + final JobManagerServices jobManagerServices, + final MetricRegistry metricRegistry, + final OnCompletionActions toNotifyOnComplete, + final FatalErrorHandler errorHandler) throws Exception { - this.jobGraph = jobGraph; - this.toNotify = toNotify; - this.submittedJobGraphStore = haServices.getSubmittedJobGraphStore(); - this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID()); - - this.jobManager = new JobMaster( - jobGraph, configuration, rpcService, haServices, - jobManagerServices.libraryCacheManager, - jobManagerServices.restartStrategyFactory, - jobManagerServices.savepointStore, - jobManagerServices.timeout, - new Scheduler(jobManagerServices.executorService), - jobManagerServices.jobManagerMetricGroup, - this); + + JobManagerMetricGroup jobManagerMetrics = null; + + // make sure we cleanly shut down out JobManager services if initialization fails + try { + this.jobGraph = checkNotNull(jobGraph); + this.toNotifyOnComplete = checkNotNull(toNotifyOnComplete); + this.errorHandler = checkNotNull(errorHandler); + this.jobManagerServices = checkNotNull(jobManagerServices); + + checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty"); + + final String hostAddress = rpcService.getAddress().isEmpty() ? "localhost" : rpcService.getAddress(); + jobManagerMetrics = new JobManagerMetricGroup(metricRegistry, hostAddress); + this.jobManagerMetricGroup = jobManagerMetrics; + + // libraries and class loader first + final BlobLibraryCacheManager libraryCacheManager = jobManagerServices.libraryCacheManager; + try { + libraryCacheManager.registerJob( + jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths()); + } catch (IOException e) { + throw new Exception("Cannot set up the user code libraries: " + e.getMessage(), e); + } + + final ClassLoader userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID()); + if (userCodeLoader == null) { + throw new Exception("The user code class loader could not be initialized."); + } + + // high availability services next + this.runningJobsRegistry = haServices.getRunningJobsRegistry(); + this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID()); + + // now start the JobManager + this.jobManager = new JobMaster( + jobGraph, configuration, + rpcService, + haServices, + jobManagerServices.executorService, + jobManagerServices.libraryCacheManager, + jobManagerServices.restartStrategyFactory, + jobManagerServices.savepointStore, + jobManagerServices.rpcAskTimeout, + jobManagerMetrics, + this, + this, + userCodeLoader); + } + catch (Throwable t) { + // clean up everything + try { + jobManagerServices.shutdown(); + } catch (Throwable tt) { + log.error("Error while shutting down JobManager services", tt); + } + + if (jobManagerMetrics != null) { + jobManagerMetrics.close(); + } + + throw new JobExecutionException(jobGraph.getJobID(), "Could not set up JobManager", t); + } } //---------------------------------------------------------------------------------------------- @@ -101,9 +202,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions { //---------------------------------------------------------------------------------------------- public void start() throws Exception { - jobManager.init(); - jobManager.start(); - try { leaderElectionService.start(this); } @@ -114,11 +212,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions { } public void shutdown() { - shutdown(new Exception("The JobManager runner is shutting down")); - } - - public void shutdown(Throwable cause) { - // TODO what is the cause used for ? shutdownInternally(); } @@ -129,12 +222,29 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions { if (leaderElectionService != null) { try { leaderElectionService.stop(); - } catch (Exception e) { - log.error("Could not properly shutdown the leader election service."); + } catch (Throwable t) { + log.error("Could not properly shutdown the leader election service", t); } } - jobManager.shutDown(); + try { + jobManager.shutDown(); + } catch (Throwable t) { + log.error("Error shutting down JobManager", t); + } + + try { + jobManagerServices.shutdown(); + } catch (Throwable t) { + log.error("Error shutting down JobManager services", t); + } + + // make all registered metrics go away + try { + jobManagerMetricGroup.close(); + } catch (Throwable t) { + log.error("Error while unregistering metrics", t); + } } } @@ -148,11 +258,12 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions { @Override public void jobFinished(JobExecutionResult result) { try { + unregisterJobFromHighAvailability(); shutdownInternally(); } finally { - if (toNotify != null) { - toNotify.jobFinished(result); + if (toNotifyOnComplete != null) { + toNotifyOnComplete.jobFinished(result); } } } @@ -163,11 +274,12 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions { @Override public void jobFailed(Throwable cause) { try { + unregisterJobFromHighAvailability(); shutdownInternally(); } finally { - if (toNotify != null) { - toNotify.jobFailed(cause); + if (toNotifyOnComplete != null) { + toNotifyOnComplete.jobFailed(cause); } } } @@ -178,11 +290,12 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions { @Override public void jobFinishedByOther() { try { + unregisterJobFromHighAvailability(); shutdownInternally(); } finally { - if (toNotify != null) { - toNotify.jobFinishedByOther(); + if (toNotifyOnComplete != null) { + toNotifyOnComplete.jobFinishedByOther(); } } } @@ -192,18 +305,43 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions { */ @Override public void onFatalError(Throwable exception) { - // first and in any case, notify our handler, so it can react fast + // we log first to make sure an explaining message goes into the log + // we even guard the log statement here to increase chances that the error handler + // gets the notification on hard critical situations like out-of-memory errors + try { + log.error("JobManager runner encountered a fatal error.", exception); + } catch (Throwable ignored) {} + + // in any case, notify our handler, so it can react fast try { - if (toNotify != null) { - toNotify.onFatalError(exception); + if (errorHandler != null) { + errorHandler.onFatalError(exception); } } finally { - log.error("JobManager runner encountered a fatal error.", exception); + // the shutdown may not even needed any more, if the fatal error + // handler kills the process. that is fine, a process kill cleans up better than anything. shutdownInternally(); } } + /** + * Marks this runner's job as not running. Other JobManager will not recover the job + * after this call. + * + * <p>This method never throws an exception. + */ + private void unregisterJobFromHighAvailability() { + try { + runningJobsRegistry.setJobFinished(jobGraph.getJobID()); + } + catch (Throwable t) { + log.error("Could not un-register from high-availability services job {} ({})." + + "Other JobManager's may attempt to recover it and re-execute it.", + jobGraph.getName(), jobGraph.getJobID(), t); + } + } + //---------------------------------------------------------------------------------------------- // Leadership methods //---------------------------------------------------------------------------------------------- @@ -223,15 +361,25 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions { // it's okay that job manager wait for the operation complete leaderElectionService.confirmLeaderSessionID(leaderSessionID); + boolean jobRunning; + try { + jobRunning = runningJobsRegistry.isJobRunning(jobGraph.getJobID()); + } catch (Throwable t) { + log.error("Could not access status (running/finished) of job {}. " + + "Falling back to assumption that job is running and attempting recovery...", + jobGraph.getJobID(), t); + jobRunning = true; + } + // Double check the leadership after we confirm that, there is a small chance that multiple // job managers schedule the same job after if they try to recover at the same time. // This will eventually be noticed, but can not be ruled out from the beginning. if (leaderElectionService.hasLeadership()) { - if (isJobFinishedByOthers()) { + if (jobRunning) { + jobManager.start(leaderSessionID); + } else { log.info("Job {} ({}) already finished by others.", jobGraph.getName(), jobGraph.getJobID()); jobFinishedByOther(); - } else { - jobManager.getSelf().startJob(leaderSessionID); } } } @@ -248,7 +396,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions { log.info("JobManager for job {} ({}) was revoked leadership at {}.", jobGraph.getName(), jobGraph.getJobID(), getAddress()); - jobManager.getSelf().suspendJob(new Exception("JobManager is no longer the leader.")); + jobManager.getSelf().suspendExecution(new Exception("JobManager is no longer the leader.")); } } @@ -263,11 +411,9 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions { onFatalError(exception); } - @VisibleForTesting - boolean isJobFinishedByOthers() { - // TODO: Fix - return false; - } + //---------------------------------------------------------------------------------------------- + // Testing + //---------------------------------------------------------------------------------------------- @VisibleForTesting boolean isShutdown() { http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java index e6beba6..eebfbfa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java @@ -19,13 +19,21 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; -import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.util.ExceptionUtils; +import scala.concurrent.duration.FiniteDuration; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -42,32 +50,95 @@ public class JobManagerServices { public final SavepointStore savepointStore; - public final Time timeout; - - public final JobManagerMetricGroup jobManagerMetricGroup; + public final Time rpcAskTimeout; public JobManagerServices( ExecutorService executorService, BlobLibraryCacheManager libraryCacheManager, RestartStrategyFactory restartStrategyFactory, SavepointStore savepointStore, - Time timeout, - JobManagerMetricGroup jobManagerMetricGroup) { + Time rpcAskTimeout) { this.executorService = checkNotNull(executorService); this.libraryCacheManager = checkNotNull(libraryCacheManager); this.restartStrategyFactory = checkNotNull(restartStrategyFactory); this.savepointStore = checkNotNull(savepointStore); - this.timeout = checkNotNull(timeout); - this.jobManagerMetricGroup = checkNotNull(jobManagerMetricGroup); + this.rpcAskTimeout = checkNotNull(rpcAskTimeout); + } + + /** + * + * <p>This method makes sure all services are closed or shut down, even when an exception occurred + * in the shutdown of one component. The first encountered exception is thrown, with successive + * exceptions added as suppressed exceptions. + * + * @throws Exception The first Exception encountered during shutdown. + */ + public void shutdown() throws Exception { + Throwable firstException = null; + + try { + executorService.shutdownNow(); + } catch (Throwable t) { + firstException = t; + } + + try { + savepointStore.shutdown(); + } + catch (Throwable t) { + if (firstException == null) { + firstException = t; + } else { + firstException.addSuppressed(t); + } + } + + try { + libraryCacheManager.shutdown(); + } + catch (Throwable t) { + if (firstException == null) { + firstException = t; + } else { + firstException.addSuppressed(t); + } + } + + if (firstException != null) { + ExceptionUtils.rethrowException(firstException, "Error while shutting down JobManager services"); + } } // ------------------------------------------------------------------------ // Creating the components from a configuration // ------------------------------------------------------------------------ - public static JobManagerServices fromConfiguration(Configuration config) throws Exception { - // TODO not yet implemented - return null; + + public static JobManagerServices fromConfiguration( + Configuration config, + HighAvailabilityServices haServices) throws Exception { + + final BlobServer blobServer = new BlobServer(config, haServices); + + final long cleanupInterval = config.getLong( + ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, + ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000; + + final BlobLibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval); + + final FiniteDuration timeout; + try { + timeout = AkkaUtils.getTimeout(config); + } catch (NumberFormatException e) { + throw new IllegalConfigurationException(AkkaUtils.formatDurationParingErrorMessage()); + } + + return new JobManagerServices( + new ForkJoinPool(), + libraryCacheManager, + RestartStrategyFactory.createRestartStrategyFactory(config), + SavepointStoreFactory.createFromConfig(config), + Time.of(timeout.length(), timeout.unit())); } }