[FLINK-6630] [FLINK-6631] Implement FLIP-6 Mesos cluster entrypoints + MesosTaskExecutorRunner
- bin: new entrypoints scripts for flip-6 - ClusterEntrypoint: Refactor the shutdown method - ClusterEntrypoint: Install default FileSystem (for parity with legacy entrypoints) - ClusterEntrypoint: new MesosJobClusterEntrypoint, MesosSessionClusterEntrypoint, MesosEntrypointUtils, MesosTaskExecutorRunner - MesosServices: enhanced with artifactServer, localActorSystem - MesosResourceManager: Fallback to old TM params when UNKNOWN resource profile is provided - MesosResourceManager: config setting for taskmanager startup script (mesos.resourcemanager.tasks.taskmanager-cmd) - test: added a 'noop' job graph for testing purposes This closes #4555. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bbac4a6c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bbac4a6c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bbac4a6c Branch: refs/heads/master Commit: bbac4a6c922199db08a5244d0fa1262a5f16d479 Parents: 76f1022 Author: Wright, Eron <eron.wri...@emc.com> Authored: Wed Aug 16 14:30:24 2017 -0700 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Sat Aug 19 17:25:21 2017 +0200 ---------------------------------------------------------------------- .../mesos-bin/mesos-appmaster-flip6-job.sh | 47 +++++ .../mesos-bin/mesos-appmaster-flip6-session.sh | 47 +++++ .../mesos-bin/mesos-taskmanager-flip6.sh | 45 ++++ .../mesos/entrypoint/MesosEntrypointUtils.java | 177 ++++++++++++++++ .../entrypoint/MesosJobClusterEntrypoint.java | 204 +++++++++++++++++++ .../MesosSessionClusterEntrypoint.java | 178 ++++++++++++++++ .../entrypoint/MesosTaskExecutorRunner.java | 132 ++++++++++++ .../clusterframework/LaunchableMesosWorker.java | 30 ++- .../MesosApplicationMasterRunner.java | 131 +----------- .../clusterframework/MesosResourceManager.java | 47 +++-- .../MesosTaskManagerParameters.java | 20 +- .../services/AbstractMesosServices.java | 73 +++++++ .../services/MesosServices.java | 17 ++ .../services/MesosServicesUtils.java | 28 ++- .../services/StandaloneMesosServices.java | 10 +- .../services/ZooKeeperMesosServices.java | 30 ++- .../MesosFlinkResourceManagerTest.java | 1 + .../MesosResourceManagerTest.java | 48 +++-- .../runtime/entrypoint/ClusterEntrypoint.java | 27 +++ .../entrypoint/JobClusterEntrypoint.java | 10 +- .../entrypoint/SessionClusterEntrypoint.java | 8 +- .../test/runtime/entrypoint/StreamingNoop.java | 60 ++++++ 22 files changed, 1186 insertions(+), 184 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-job.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-job.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-job.sh new file mode 100755 index 0000000..b21670a --- /dev/null +++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-job.sh @@ -0,0 +1,47 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +# get Flink config +. "$bin"/config.sh + +if [ "$FLINK_IDENT_STRING" = "" ]; then + FLINK_IDENT_STRING="$USER" +fi + +CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS` + +log="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-mesos-appmaster-${HOSTNAME}.log" +log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml" + +export FLINK_CONF_DIR +export FLINK_BIN_DIR +export FLINK_LIB_DIR + +exec $JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.mesos.entrypoint.MesosJobClusterEntrypoint "$@" + +rc=$? + +if [[ $rc -ne 0 ]]; then + echo "Error while starting the mesos application master. Please check ${log} for more details." +fi + +exit $rc http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-session.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-session.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-session.sh new file mode 100755 index 0000000..b9e0f53 --- /dev/null +++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-session.sh @@ -0,0 +1,47 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +# get Flink config +. "$bin"/config.sh + +if [ "$FLINK_IDENT_STRING" = "" ]; then + FLINK_IDENT_STRING="$USER" +fi + +CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS` + +log="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-mesos-appmaster-${HOSTNAME}.log" +log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml" + +export FLINK_CONF_DIR +export FLINK_BIN_DIR +export FLINK_LIB_DIR + +exec $JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint "$@" + +rc=$? + +if [[ $rc -ne 0 ]]; then + echo "Error while starting the mesos application master. Please check ${log} for more details." +fi + +exit $rc http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager-flip6.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager-flip6.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager-flip6.sh new file mode 100755 index 0000000..f251442 --- /dev/null +++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager-flip6.sh @@ -0,0 +1,45 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +# get Flink config +. "$bin"/config.sh + +CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS` + +log=flink-taskmanager.log +log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml" + +# Add precomputed memory JVM options +if [ -z "${FLINK_ENV_JAVA_OPTS_MEM}" ]; then + FLINK_ENV_JAVA_OPTS_MEM="" +fi +export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_MEM}" + +# Add TaskManager-specific JVM options +export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}" + +export FLINK_CONF_DIR +export FLINK_BIN_DIR +export FLINK_LIB_DIR + +exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner "$@" + http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java new file mode 100755 index 0000000..0d81ead --- /dev/null +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.entrypoint; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.mesos.configuration.MesosOptions; +import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay; +import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay; +import org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay; +import org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay; +import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay; +import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay; +import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay; + +import org.apache.commons.cli.CommandLine; +import org.apache.mesos.Protos; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +/** + * Utils for Mesos entrpoints. + */ +public class MesosEntrypointUtils { + + /** + * Loads the global configuration and adds the dynamic properties parsed from + * the given command line. + * + * @param cmd command line to parse for dynamic properties + * @return Global configuration with dynamic properties set + * @deprecated replace once FLINK-7269 has been merged + */ + @Deprecated + public static Configuration loadConfiguration(CommandLine cmd) { + + // merge the dynamic properties from the command-line + Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd); + GlobalConfiguration.setDynamicProperties(dynamicProperties); + Configuration config = GlobalConfiguration.loadConfiguration(); + + return config; + } + + /** + * Loads and validates the Mesos scheduler configuration. + * @param flinkConfig the global configuration. + * @param hostname the hostname to advertise to the Mesos master. + */ + public static MesosConfiguration createMesosSchedulerConfiguration(Configuration flinkConfig, String hostname) { + + Protos.FrameworkInfo.Builder frameworkInfo = Protos.FrameworkInfo.newBuilder() + .setHostname(hostname); + Protos.Credential.Builder credential = null; + + if (!flinkConfig.contains(MesosOptions.MASTER_URL)) { + throw new IllegalConfigurationException(MesosOptions.MASTER_URL.key() + " must be configured."); + } + String masterUrl = flinkConfig.getString(MesosOptions.MASTER_URL); + + Duration failoverTimeout = FiniteDuration.apply( + flinkConfig.getInteger( + MesosOptions.FAILOVER_TIMEOUT_SECONDS), + TimeUnit.SECONDS); + frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds()); + + frameworkInfo.setName(flinkConfig.getString( + MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME)); + + frameworkInfo.setRole(flinkConfig.getString( + MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE)); + + frameworkInfo.setUser(flinkConfig.getString( + MesosOptions.RESOURCEMANAGER_FRAMEWORK_USER)); + + if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) { + frameworkInfo.setPrincipal(flinkConfig.getString( + MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)); + + credential = Protos.Credential.newBuilder(); + credential.setPrincipal(frameworkInfo.getPrincipal()); + + // some environments use a side-channel to communicate the secret to Mesos, + // and thus don't set the 'secret' configuration setting + if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET)) { + credential.setSecret(flinkConfig.getString( + MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET)); + } + } + + MesosConfiguration mesos = + new MesosConfiguration(masterUrl, frameworkInfo, scala.Option.apply(credential)); + + return mesos; + } + + public static MesosTaskManagerParameters createTmParameters(Configuration configuration, Logger log) { + // TM configuration + final MesosTaskManagerParameters taskManagerParameters = MesosTaskManagerParameters.create(configuration); + + log.info("TaskManagers will be created with {} task slots", + taskManagerParameters.containeredParameters().numSlots()); + log.info("TaskManagers will be started with container size {} MB, JVM heap size {} MB, " + + "JVM direct memory limit {} MB, {} cpus", + taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB(), + taskManagerParameters.containeredParameters().taskManagerHeapSizeMB(), + taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(), + taskManagerParameters.cpus()); + + return taskManagerParameters; + } + + public static ContainerSpecification createContainerSpec(Configuration configuration, Configuration dynamicProperties) + throws Exception { + // generate a container spec which conveys the artifacts/vars needed to launch a TM + ContainerSpecification spec = new ContainerSpecification(); + + // propagate the AM dynamic configuration to the TM + spec.getDynamicConfiguration().addAll(dynamicProperties); + + applyOverlays(configuration, spec); + + return spec; + } + + /** + * Generate a container specification as a TaskManager template. + * + * <p>This code is extremely Mesos-specific and registers all the artifacts that the TaskManager + * needs (such as JAR file, config file, ...) and all environment variables into a container specification. + * The Mesos fetcher then ensures that those artifacts will be copied into the task's sandbox directory. + * A lightweight HTTP server serves the artifacts to the fetcher. + */ + public static void applyOverlays( + Configuration configuration, ContainerSpecification containerSpec) throws IOException { + + // create the overlays that will produce the specification + CompositeContainerOverlay overlay = new CompositeContainerOverlay( + FlinkDistributionOverlay.newBuilder().fromEnvironment(configuration).build(), + HadoopConfOverlay.newBuilder().fromEnvironment(configuration).build(), + HadoopUserOverlay.newBuilder().fromEnvironment(configuration).build(), + KeytabOverlay.newBuilder().fromEnvironment(configuration).build(), + Krb5ConfOverlay.newBuilder().fromEnvironment(configuration).build(), + SSLStoreOverlay.newBuilder().fromEnvironment(configuration).build() + ); + + // apply the overlays + overlay.configure(containerSpec); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java new file mode 100755 index 0000000..890c4a7 --- /dev/null +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.entrypoint; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager; +import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters; +import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices; +import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Entry point for Mesos per-job clusters. + */ +public class MesosJobClusterEntrypoint extends JobClusterEntrypoint { + + public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path"; + + // ------------------------------------------------------------------------ + // Command-line options + // ------------------------------------------------------------------------ + + private static final Options ALL_OPTIONS; + + static { + ALL_OPTIONS = + new Options() + .addOption(BootstrapTools.newDynamicPropertiesOption()); + } + + private MesosConfiguration schedulerConfiguration; + + private MesosServices mesosServices; + + private MesosTaskManagerParameters taskManagerParameters; + + private ContainerSpecification taskManagerContainerSpec; + + public MesosJobClusterEntrypoint(Configuration config) { + super(config); + } + + @Override + protected void initializeServices(Configuration config) throws Exception { + super.initializeServices(config); + + final String hostname = config.getString(JobManagerOptions.ADDRESS); + + // Mesos configuration + schedulerConfiguration = MesosEntrypointUtils.createMesosSchedulerConfiguration(config, hostname); + + // services + mesosServices = MesosServicesUtils.createMesosServices(config, hostname); + + // TM configuration + taskManagerParameters = MesosEntrypointUtils.createTmParameters(config, LOG); + taskManagerContainerSpec = MesosEntrypointUtils.createContainerSpec(config, GlobalConfiguration.getDynamicProperties()); + } + + @Override + protected void startClusterComponents(Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry) throws Exception { + super.startClusterComponents(configuration, rpcService, highAvailabilityServices, blobServer, heartbeatServices, metricRegistry); + } + + @Override + protected ResourceManager<?> createResourceManager( + Configuration configuration, + ResourceID resourceId, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + MetricRegistry metricRegistry, + FatalErrorHandler fatalErrorHandler) throws Exception { + final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration); + final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration); + final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration( + rmServicesConfiguration, + highAvailabilityServices, + rpcService.getScheduledExecutor()); + + return new MesosResourceManager( + rpcService, + ResourceManager.RESOURCE_MANAGER_NAME, + resourceId, + rmConfiguration, + highAvailabilityServices, + heartbeatServices, + rmRuntimeServices.getSlotManager(), + metricRegistry, + rmRuntimeServices.getJobLeaderIdService(), + fatalErrorHandler, + configuration, + mesosServices, + schedulerConfiguration, + taskManagerParameters, + taskManagerContainerSpec + ); + } + + @Override + protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException { + String jobGraphFile = configuration.getString(JOB_GRAPH_FILE_PATH, "job.graph"); + File fp = new File(jobGraphFile); + + try (FileInputStream input = new FileInputStream(fp); + ObjectInputStream obInput = new ObjectInputStream(input)) { + + return (JobGraph) obInput.readObject(); + } catch (FileNotFoundException e) { + throw new FlinkException("Could not find the JobGraph file.", e); + } catch (ClassNotFoundException | IOException e) { + throw new FlinkException("Could not load the JobGraph from file.", e); + } + } + + @Override + protected void stopClusterComponents(boolean cleanupHaData) throws Exception { + Throwable exception = null; + + try { + super.stopClusterComponents(cleanupHaData); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + + if (mesosServices != null) { + try { + mesosServices.close(cleanupHaData); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + + if (exception != null) { + throw new FlinkException("Could not properly shut down the Mesos job cluster entry point.", exception); + } + } + + public static void main(String[] args) { + // load configuration incl. dynamic properties + CommandLineParser parser = new PosixParser(); + CommandLine cmd; + try { + cmd = parser.parse(ALL_OPTIONS, args); + } + catch (Exception e){ + LOG.error("Could not parse the command-line options.", e); + System.exit(STARTUP_FAILURE_RETURN_CODE); + return; + } + + Configuration configuration = MesosEntrypointUtils.loadConfiguration(cmd); + + MesosJobClusterEntrypoint clusterEntrypoint = new MesosJobClusterEntrypoint(configuration); + + clusterEntrypoint.startCluster(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java new file mode 100755 index 0000000..67f5899 --- /dev/null +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.entrypoint; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager; +import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters; +import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices; +import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; + +/** + * Entry point for Mesos session clusters. + */ +public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint { + + // ------------------------------------------------------------------------ + // Command-line options + // ------------------------------------------------------------------------ + + private static final Options ALL_OPTIONS; + + static { + ALL_OPTIONS = + new Options() + .addOption(BootstrapTools.newDynamicPropertiesOption()); + } + + private MesosConfiguration mesosConfig; + + private MesosServices mesosServices; + + private MesosTaskManagerParameters taskManagerParameters; + + private ContainerSpecification taskManagerContainerSpec; + + public MesosSessionClusterEntrypoint(Configuration config) { + super(config); + } + + @Override + protected void initializeServices(Configuration config) throws Exception { + super.initializeServices(config); + + final String hostname = config.getString(JobManagerOptions.ADDRESS); + + // Mesos configuration + mesosConfig = MesosEntrypointUtils.createMesosSchedulerConfiguration(config, hostname); + + // services + mesosServices = MesosServicesUtils.createMesosServices(config, hostname); + + // TM configuration + taskManagerParameters = MesosEntrypointUtils.createTmParameters(config, LOG); + taskManagerContainerSpec = MesosEntrypointUtils.createContainerSpec(config, GlobalConfiguration.getDynamicProperties()); + } + + @Override + protected void startClusterComponents(Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry) throws Exception { + super.startClusterComponents(configuration, rpcService, highAvailabilityServices, blobServer, heartbeatServices, metricRegistry); + } + + @Override + protected ResourceManager<?> createResourceManager( + Configuration configuration, + ResourceID resourceId, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + MetricRegistry metricRegistry, + FatalErrorHandler fatalErrorHandler) throws Exception { + final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration); + final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration); + final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration( + rmServicesConfiguration, + highAvailabilityServices, + rpcService.getScheduledExecutor()); + + return new MesosResourceManager( + rpcService, + ResourceManager.RESOURCE_MANAGER_NAME, + resourceId, + rmConfiguration, + highAvailabilityServices, + heartbeatServices, + rmRuntimeServices.getSlotManager(), + metricRegistry, + rmRuntimeServices.getJobLeaderIdService(), + fatalErrorHandler, + configuration, + mesosServices, + mesosConfig, + taskManagerParameters, + taskManagerContainerSpec + ); + } + + @Override + protected void stopClusterComponents(boolean cleanupHaData) throws Exception { + Throwable exception = null; + + try { + super.stopClusterComponents(cleanupHaData); + } catch (Throwable t) { + exception = t; + } + + if (mesosServices != null) { + try { + mesosServices.close(cleanupHaData); + } catch (Throwable t) { + exception = t; + } + } + + if (exception != null) { + throw new FlinkException("Could not properly shut down the Mesos session cluster entry point.", exception); + } + } + + public static void main(String[] args) { + // load configuration incl. dynamic properties + CommandLineParser parser = new PosixParser(); + CommandLine cmd; + try { + cmd = parser.parse(ALL_OPTIONS, args); + } + catch (Exception e){ + LOG.error("Could not parse the command-line options.", e); + System.exit(STARTUP_FAILURE_RETURN_CODE); + return; + } + + Configuration configuration = MesosEntrypointUtils.loadConfiguration(cmd); + + MesosSessionClusterEntrypoint clusterEntrypoint = new MesosSessionClusterEntrypoint(configuration); + + clusterEntrypoint.startCluster(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java new file mode 100644 index 0000000..c4343d2 --- /dev/null +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.entrypoint; + +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.Callable; + +/** + * The entry point for running a TaskManager in a Mesos container. + */ +public class MesosTaskExecutorRunner { + + private static final Logger LOG = LoggerFactory.getLogger(MesosTaskExecutorRunner.class); + + private static final int INIT_ERROR_EXIT_CODE = 31; + + private static final Options ALL_OPTIONS; + + static { + ALL_OPTIONS = + new Options() + .addOption(BootstrapTools.newDynamicPropertiesOption()); + } + + public static void main(String[] args) throws Exception { + EnvironmentInformation.logEnvironmentInfo(LOG, MesosTaskExecutorRunner.class.getSimpleName(), args); + SignalHandler.register(LOG); + JvmShutdownSafeguard.installAsShutdownHook(LOG); + + // try to parse the command line arguments + CommandLineParser parser = new PosixParser(); + CommandLine cmd = parser.parse(ALL_OPTIONS, args); + + final Configuration configuration; + try { + configuration = MesosEntrypointUtils.loadConfiguration(cmd); + } + catch (Throwable t) { + LOG.error("Failed to load the TaskManager configuration and dynamic properties.", t); + System.exit(INIT_ERROR_EXIT_CODE); + return; + } + + // read the environment variables + final Map<String, String> envs = System.getenv(); + final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR); + + // configure local directory + String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null); + if (flinkTempDirs != null) { + LOG.info("Overriding Mesos temporary file directories with those " + + "specified in the Flink config: {}", flinkTempDirs); + } + else if (tmpDirs != null) { + LOG.info("Setting directories for temporary files to: {}", tmpDirs); + configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tmpDirs); + } + + // configure the default filesystem + try { + FileSystem.setDefaultScheme(configuration); + } catch (IOException e) { + throw new IOException("Error while setting the default " + + "filesystem scheme from configuration.", e); + } + + // tell akka to die in case of an error + configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true); + + // Infer the resource identifier from the environment variable + String containerID = Preconditions.checkNotNull(envs.get(MesosConfigKeys.ENV_FLINK_CONTAINER_ID)); + final ResourceID resourceId = new ResourceID(containerID); + LOG.info("ResourceID assigned for this container: {}", resourceId); + + // Run the TM in the security context + SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration); + SecurityUtils.install(sc); + + try { + SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() { + @Override + public Integer call() throws Exception { + TaskManagerRunner.runTaskManager(configuration, resourceId); + + return 0; + } + }); + } + catch (Throwable t) { + LOG.error("Error while starting the TaskManager", t); + System.exit(INIT_ERROR_EXIT_CODE); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java index ce7bb9d..2c32507 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.mesos.Utils; import org.apache.flink.mesos.scheduler.LaunchableTask; import org.apache.flink.mesos.util.MesosArtifactResolver; +import org.apache.flink.mesos.util.MesosArtifactServer; import org.apache.flink.mesos.util.MesosConfiguration; import org.apache.flink.runtime.clusterframework.ContainerSpecification; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; @@ -36,6 +37,7 @@ import org.apache.mesos.Protos; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; @@ -261,12 +263,15 @@ public class LaunchableMesosWorker implements LaunchableTask { env.addVariables(variable(MesosConfigKeys.ENV_FRAMEWORK_NAME, mesosConfiguration.frameworkInfo().getName())); // build the launch command w/ dynamic application properties - Option<String> bootstrapCmdOption = params.bootstrapCommand(); - - final String bootstrapCommand = bootstrapCmdOption.isDefined() ? bootstrapCmdOption.get() + " && " : ""; - final String launchCommand = bootstrapCommand + "$FLINK_HOME/bin/mesos-taskmanager.sh " + ContainerSpecification.formatSystemProperties(dynamicProperties); - - cmd.setValue(launchCommand); + StringBuilder launchCommand = new StringBuilder(); + if (params.bootstrapCommand().isDefined()) { + launchCommand.append(params.bootstrapCommand().get()).append(" && "); + } + launchCommand + .append(params.command()) + .append(" ") + .append(ContainerSpecification.formatSystemProperties(dynamicProperties)); + cmd.setValue(launchCommand.toString()); // build the container info Protos.ContainerInfo.Builder containerInfo = Protos.ContainerInfo.newBuilder(); @@ -312,4 +317,17 @@ public class LaunchableMesosWorker implements LaunchableTask { "taskRequest=" + taskRequest + '}'; } + + /** + * Configures an artifact server to serve the artifacts associated with a container specification. + * @param server the server to configure. + * @param container the container with artifacts to serve. + * @throws IOException if the artifacts cannot be accessed. + */ + static void configureArtifactServer(MesosArtifactServer server, ContainerSpecification container) throws IOException { + // serve the artifacts associated with the container environment + for (ContainerSpecification.Artifact artifact : container.getArtifacts()) { + server.addPath(artifact.source, artifact.dest); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java old mode 100644 new mode 100755 index 7891386..3d16a66 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java @@ -21,11 +21,10 @@ package org.apache.flink.mesos.runtime.clusterframework; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.mesos.configuration.MesosOptions; +import org.apache.flink.mesos.entrypoint.MesosEntrypointUtils; import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices; import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils; import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; @@ -34,13 +33,6 @@ import org.apache.flink.mesos.util.MesosConfiguration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContainerSpecification; -import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay; -import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay; -import org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay; -import org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay; -import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay; -import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay; -import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobmanager.JobManager; @@ -65,7 +57,6 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; -import org.apache.mesos.Protos; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,7 +64,6 @@ import java.io.IOException; import java.net.InetAddress; import java.net.URL; import java.util.Map; -import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -81,7 +71,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import scala.Option; -import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import static org.apache.flink.util.Preconditions.checkState; @@ -222,7 +211,7 @@ public class MesosApplicationMasterRunner { LOG.info("App Master Hostname to use: {}", appMasterHostname); // Mesos configuration - final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname); + final MesosConfiguration mesosConfig = MesosEntrypointUtils.createMesosSchedulerConfiguration(config, appMasterHostname); // JM configuration int numberProcessors = Hardware.getNumberCPUCores(); @@ -235,19 +224,10 @@ public class MesosApplicationMasterRunner { numberProcessors, new ExecutorThreadFactory("mesos-jobmanager-io")); - mesosServices = MesosServicesUtils.createMesosServices(config); + mesosServices = MesosServicesUtils.createMesosServices(config, appMasterHostname); // TM configuration - final MesosTaskManagerParameters taskManagerParameters = MesosTaskManagerParameters.create(config); - - LOG.info("TaskManagers will be created with {} task slots", - taskManagerParameters.containeredParameters().numSlots()); - LOG.info("TaskManagers will be started with container size {} MB, JVM heap size {} MB, " + - "JVM direct memory limit {} MB, {} cpus", - taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB(), - taskManagerParameters.containeredParameters().taskManagerHeapSizeMB(), - taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(), - taskManagerParameters.cpus()); + final MesosTaskManagerParameters taskManagerParameters = MesosEntrypointUtils.createTmParameters(config, LOG); // JM endpoint, which should be explicitly configured based on acquired net resources final int listeningPort = config.getInteger(JobManagerOptions.PORT); @@ -268,9 +248,7 @@ public class MesosApplicationMasterRunner { // try to start the artifact server LOG.debug("Starting Artifact Server"); - final int artifactServerPort = config.getInteger(MesosOptions.ARTIFACT_SERVER_PORT); - final String artifactServerPrefix = UUID.randomUUID().toString(); - artifactServer = new MesosArtifactServer(artifactServerPrefix, akkaHostname, artifactServerPort, config); + artifactServer = mesosServices.getArtifactServer(); // ----------------- (3) Generate the configuration for the TaskManagers ------------------- @@ -287,10 +265,10 @@ public class MesosApplicationMasterRunner { taskManagerContainerSpec.getDynamicConfiguration().addAll(taskManagerConfig); // apply the overlays - applyOverlays(config, taskManagerContainerSpec); + MesosEntrypointUtils.applyOverlays(config, taskManagerContainerSpec); // configure the artifact server to serve the specified artifacts - configureArtifactServer(artifactServer, taskManagerContainerSpec); + LaunchableMesosWorker.configureArtifactServer(artifactServer, taskManagerContainerSpec); // ----------------- (4) start the actors ------------------- @@ -386,14 +364,6 @@ public class MesosApplicationMasterRunner { } } - if (artifactServer != null) { - try { - artifactServer.stop(); - } catch (Throwable ignored) { - LOG.error("Failed to stop the artifact server", ignored); - } - } - if (actorSystem != null) { try { actorSystem.shutdown(); @@ -444,12 +414,6 @@ public class MesosApplicationMasterRunner { } } - try { - artifactServer.stop(); - } catch (Throwable t) { - LOG.error("Failed to stop the artifact server", t); - } - if (highAvailabilityServices != null) { try { highAvailabilityServices.close(); @@ -490,85 +454,4 @@ public class MesosApplicationMasterRunner { return MemoryArchivist.class; } - /** - * Loads and validates the ResourceManager Mesos configuration from the given Flink configuration. - */ - public static MesosConfiguration createMesosConfig(Configuration flinkConfig, String hostname) { - - Protos.FrameworkInfo.Builder frameworkInfo = Protos.FrameworkInfo.newBuilder() - .setHostname(hostname); - Protos.Credential.Builder credential = null; - - if (!flinkConfig.contains(MesosOptions.MASTER_URL)) { - throw new IllegalConfigurationException(MesosOptions.MASTER_URL.key() + " must be configured."); - } - String masterUrl = flinkConfig.getString(MesosOptions.MASTER_URL); - - Duration failoverTimeout = FiniteDuration.apply( - flinkConfig.getInteger( - MesosOptions.FAILOVER_TIMEOUT_SECONDS), - TimeUnit.SECONDS); - frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds()); - - frameworkInfo.setName(flinkConfig.getString( - MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME)); - - frameworkInfo.setRole(flinkConfig.getString( - MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE)); - - frameworkInfo.setUser(flinkConfig.getString( - MesosOptions.RESOURCEMANAGER_FRAMEWORK_USER)); - - if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) { - frameworkInfo.setPrincipal(flinkConfig.getString( - MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)); - - credential = Protos.Credential.newBuilder(); - credential.setPrincipal(frameworkInfo.getPrincipal()); - - // some environments use a side-channel to communicate the secret to Mesos, - // and thus don't set the 'secret' configuration setting - if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET)) { - credential.setSecret(flinkConfig.getString( - MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET)); - } - } - - MesosConfiguration mesos = - new MesosConfiguration(masterUrl, frameworkInfo, scala.Option.apply(credential)); - - return mesos; - } - - /** - * Generate a container specification as a TaskManager template. - * - * <p>This code is extremely Mesos-specific and registers all the artifacts that the TaskManager - * needs (such as JAR file, config file, ...) and all environment variables into a container specification. - * The Mesos fetcher then ensures that those artifacts will be copied into the task's sandbox directory. - * A lightweight HTTP server serves the artifacts to the fetcher. - */ - private static void applyOverlays( - Configuration globalConfiguration, ContainerSpecification containerSpec) throws IOException { - - // create the overlays that will produce the specification - CompositeContainerOverlay overlay = new CompositeContainerOverlay( - FlinkDistributionOverlay.newBuilder().fromEnvironment(globalConfiguration).build(), - HadoopConfOverlay.newBuilder().fromEnvironment(globalConfiguration).build(), - HadoopUserOverlay.newBuilder().fromEnvironment(globalConfiguration).build(), - KeytabOverlay.newBuilder().fromEnvironment(globalConfiguration).build(), - Krb5ConfOverlay.newBuilder().fromEnvironment(globalConfiguration).build(), - SSLStoreOverlay.newBuilder().fromEnvironment(globalConfiguration).build() - ); - - // apply the overlays - overlay.configure(containerSpec); - } - - private static void configureArtifactServer(MesosArtifactServer server, ContainerSpecification container) throws IOException { - // serve the artifacts associated with the container environment - for (ContainerSpecification.Artifact artifact : container.getArtifacts()) { - server.addPath(artifact.source, artifact.dest); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java index 736af59..445010b 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java @@ -20,6 +20,7 @@ package org.apache.flink.mesos.runtime.clusterframework; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices; import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; import org.apache.flink.mesos.scheduler.ConnectionMonitor; import org.apache.flink.mesos.scheduler.LaunchCoordinator; @@ -38,7 +39,7 @@ import org.apache.flink.mesos.scheduler.messages.Registered; import org.apache.flink.mesos.scheduler.messages.ResourceOffers; import org.apache.flink.mesos.scheduler.messages.SlaveLost; import org.apache.flink.mesos.scheduler.messages.StatusUpdate; -import org.apache.flink.mesos.util.MesosArtifactResolver; +import org.apache.flink.mesos.util.MesosArtifactServer; import org.apache.flink.mesos.util.MesosConfiguration; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.ContainerSpecification; @@ -75,6 +76,7 @@ import org.apache.mesos.SchedulerDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -98,17 +100,20 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN /** The Mesos configuration (master and framework info). */ private final MesosConfiguration mesosConfig; + /** The Mesos services needed by the resource manager. */ + private final MesosServices mesosServices; + /** The TaskManager container parameters (like container memory size). */ private final MesosTaskManagerParameters taskManagerParameters; /** Container specification for launching a TM. */ private final ContainerSpecification taskManagerContainerSpec; - /** Resolver for HTTP artifacts. */ - private final MesosArtifactResolver artifactResolver; + /** Server for HTTP artifacts. */ + private final MesosArtifactServer artifactServer; /** Persistent storage of allocated containers. */ - private final MesosWorkerStore workerStore; + private MesosWorkerStore workerStore; /** A local actor system for using the helper actors. */ private final ActorSystem actorSystem; @@ -145,13 +150,11 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN JobLeaderIdService jobLeaderIdService, FatalErrorHandler fatalErrorHandler, // Mesos specifics - ActorSystem actorSystem, Configuration flinkConfig, + MesosServices mesosServices, MesosConfiguration mesosConfig, - MesosWorkerStore workerStore, MesosTaskManagerParameters taskManagerParameters, - ContainerSpecification taskManagerContainerSpec, - MesosArtifactResolver artifactResolver) { + ContainerSpecification taskManagerContainerSpec) { super( rpcService, resourceManagerEndpointId, @@ -164,13 +167,13 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN jobLeaderIdService, fatalErrorHandler); - this.actorSystem = Preconditions.checkNotNull(actorSystem); + this.mesosServices = Preconditions.checkNotNull(mesosServices); + this.actorSystem = Preconditions.checkNotNull(mesosServices.getLocalActorSystem()); this.flinkConfig = Preconditions.checkNotNull(flinkConfig); this.mesosConfig = Preconditions.checkNotNull(mesosConfig); - this.workerStore = Preconditions.checkNotNull(workerStore); - this.artifactResolver = Preconditions.checkNotNull(artifactResolver); + this.artifactServer = Preconditions.checkNotNull(mesosServices.getArtifactServer()); this.taskManagerParameters = Preconditions.checkNotNull(taskManagerParameters); this.taskManagerContainerSpec = Preconditions.checkNotNull(taskManagerContainerSpec); @@ -221,8 +224,9 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN */ @Override protected void initialize() throws ResourceManagerException { - // start the worker store + // create and start the worker store try { + this.workerStore = mesosServices.createMesosWorkerStore(flinkConfig, getRpcService().getExecutor()); workerStore.start(); } catch (Exception e) { throw new ResourceManagerException("Unable to initialize the worker store.", e); @@ -266,6 +270,14 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN throw new ResourceManagerException("Unable to recover Mesos worker state.", e); } + // configure the artifact server to serve the TM container artifacts + try { + LaunchableMesosWorker.configureArtifactServer(artifactServer, taskManagerContainerSpec); + } + catch (IOException e) { + throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e); + } + // begin scheduling connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor); schedulerDriver.start(); @@ -627,20 +639,23 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN taskManagerParameters.containerType(), taskManagerParameters.containerImageName(), new ContaineredTaskManagerParameters( - resourceProfile.getMemoryInMB() < 0 ? taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB() : resourceProfile.getMemoryInMB(), - resourceProfile.getHeapMemoryInMB(), - resourceProfile.getDirectMemoryInMB(), + ResourceProfile.UNKNOWN.equals(resourceProfile) ? taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB() : resourceProfile.getMemoryInMB(), + ResourceProfile.UNKNOWN.equals(resourceProfile) ? taskManagerParameters.containeredParameters().taskManagerHeapSizeMB() : resourceProfile.getHeapMemoryInMB(), + ResourceProfile.UNKNOWN.equals(resourceProfile) ? taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB() : resourceProfile.getDirectMemoryInMB(), 1, new HashMap<>(taskManagerParameters.containeredParameters().taskManagerEnv())), taskManagerParameters.containerVolumes(), taskManagerParameters.constraints(), + taskManagerParameters.command(), taskManagerParameters.bootstrapCommand(), taskManagerParameters.getTaskManagerHostname() ); + LOG.debug("LaunchableMesosWorker parameters: {}", params); + LaunchableMesosWorker launchable = new LaunchableMesosWorker( - artifactResolver, + artifactServer, params, taskManagerContainerSpec, taskID, http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java index f5a415e..3859913 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java @@ -74,6 +74,10 @@ public class MesosTaskManagerParameters { key("mesos.resourcemanager.tasks.hostname") .noDefaultValue(); + public static final ConfigOption<String> MESOS_TM_CMD = + key("mesos.resourcemanager.tasks.taskmanager-cmd") + .defaultValue("$FLINK_HOME/bin/mesos-taskmanager.sh"); // internal + public static final ConfigOption<String> MESOS_TM_BOOTSTRAP_CMD = key("mesos.resourcemanager.tasks.bootstrap-cmd") .noDefaultValue(); @@ -107,6 +111,8 @@ public class MesosTaskManagerParameters { private final List<ConstraintEvaluator> constraints; + private final String command; + private final Option<String> bootstrapCommand; private final Option<String> taskManagerHostname; @@ -118,6 +124,7 @@ public class MesosTaskManagerParameters { ContaineredTaskManagerParameters containeredParameters, List<Protos.Volume> containerVolumes, List<ConstraintEvaluator> constraints, + String command, Option<String> bootstrapCommand, Option<String> taskManagerHostname) { @@ -127,6 +134,7 @@ public class MesosTaskManagerParameters { this.containeredParameters = Preconditions.checkNotNull(containeredParameters); this.containerVolumes = Preconditions.checkNotNull(containerVolumes); this.constraints = Preconditions.checkNotNull(constraints); + this.command = Preconditions.checkNotNull(command); this.bootstrapCommand = Preconditions.checkNotNull(bootstrapCommand); this.taskManagerHostname = Preconditions.checkNotNull(taskManagerHostname); } @@ -183,6 +191,13 @@ public class MesosTaskManagerParameters { } /** + * Get the command. + */ + public String command() { + return command; + } + + /** * Get the bootstrap command. */ public Option<String> bootstrapCommand() { @@ -199,6 +214,7 @@ public class MesosTaskManagerParameters { ", containerVolumes=" + containerVolumes + ", constraints=" + constraints + ", taskManagerHostName=" + taskManagerHostname + + ", command=" + command + ", bootstrapCommand=" + bootstrapCommand + '}'; } @@ -249,7 +265,8 @@ public class MesosTaskManagerParameters { //obtain Task Manager Host Name from the configuration Option<String> taskManagerHostname = Option.apply(flinkConfig.getString(MESOS_TM_HOSTNAME)); - //obtain bootstrap command from the configuration + //obtain command-line from the configuration + String tmCommand = flinkConfig.getString(MESOS_TM_CMD); Option<String> tmBootstrapCommand = Option.apply(flinkConfig.getString(MESOS_TM_BOOTSTRAP_CMD)); return new MesosTaskManagerParameters( @@ -259,6 +276,7 @@ public class MesosTaskManagerParameters { containeredParameters, containerVolumes, constraints, + tmCommand, tmBootstrapCommand, taskManagerHostname); } http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/AbstractMesosServices.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/AbstractMesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/AbstractMesosServices.java new file mode 100644 index 0000000..e4f4cf7 --- /dev/null +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/AbstractMesosServices.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.services; + +import org.apache.flink.mesos.util.MesosArtifactServer; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; + +import akka.actor.ActorSystem; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An abrstact implementation of {@link MesosServices}. + */ +public abstract class AbstractMesosServices implements MesosServices { + + private final ActorSystem actorSystem; + + private final MesosArtifactServer artifactServer; + + protected AbstractMesosServices(ActorSystem actorSystem, MesosArtifactServer artifactServer) { + this.actorSystem = checkNotNull(actorSystem); + this.artifactServer = checkNotNull(artifactServer); + } + + @Override + public ActorSystem getLocalActorSystem() { + return actorSystem; + } + + @Override + public MesosArtifactServer getArtifactServer() { + return artifactServer; + } + + @Override + public void close(boolean cleanup) throws Exception { + Throwable exception = null; + + try { + actorSystem.shutdown(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + + try { + artifactServer.stop(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + + if (exception != null) { + throw new FlinkException("Could not properly shut down the Mesos services.", exception); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java index 5655bfc..6a64f4f 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java @@ -20,6 +20,9 @@ package org.apache.flink.mesos.runtime.clusterframework.services; import org.apache.flink.configuration.Configuration; import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.util.MesosArtifactServer; + +import akka.actor.ActorSystem; import java.util.concurrent.Executor; @@ -42,6 +45,20 @@ public interface MesosServices { Executor executor) throws Exception; /** + * Gets a local {@link ActorSystem} which is used for child actors within + * {@link org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager}. + * + * @return a reference to an actor system. + */ + ActorSystem getLocalActorSystem(); + + /** + * Gets the artifact server with which to serve essential resources to task managers. + * @return a reference to an artifact server. + */ + MesosArtifactServer getArtifactServer(); + + /** * Closes all state maintained by the mesos services implementation. * * @param cleanup is true if a cleanup shall be performed http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java index 370a760..c5a8516 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java @@ -20,9 +20,16 @@ package org.apache.flink.mesos.runtime.clusterframework.services; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.mesos.configuration.MesosOptions; +import org.apache.flink.mesos.util.MesosArtifactServer; +import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.zookeeper.ZooKeeperUtilityFactory; +import akka.actor.ActorSystem; + +import java.util.UUID; + /** * Utilities for the {@link MesosServices}. */ @@ -32,15 +39,21 @@ public class MesosServicesUtils { * Creates a {@link MesosServices} instance depending on the high availability settings. * * @param configuration containing the high availability settings + * @param hostname the hostname to advertise to remote clients * @return a mesos services instance * @throws Exception if the mesos services instance could not be created */ - public static MesosServices createMesosServices(Configuration configuration) throws Exception { + public static MesosServices createMesosServices(Configuration configuration, String hostname) throws Exception { + + ActorSystem localActorSystem = AkkaUtils.createLocalActorSystem(configuration); + + MesosArtifactServer artifactServer = createArtifactServer(configuration, hostname); + HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(configuration); switch (highAvailabilityMode) { case NONE: - return new StandaloneMesosServices(); + return new StandaloneMesosServices(localActorSystem, artifactServer); case ZOOKEEPER: final String zkMesosRootPath = configuration.getString( @@ -50,10 +63,19 @@ public class MesosServicesUtils { configuration, zkMesosRootPath); - return new ZooKeeperMesosServices(zooKeeperUtilityFactory); + return new ZooKeeperMesosServices(localActorSystem, artifactServer, zooKeeperUtilityFactory); default: throw new Exception("High availability mode " + highAvailabilityMode + " is not supported."); } } + + private static MesosArtifactServer createArtifactServer(Configuration configuration, String hostname) throws Exception { + final int artifactServerPort = configuration.getInteger(MesosOptions.ARTIFACT_SERVER_PORT, 0); + + // a random prefix is affixed to artifact URLs to ensure uniqueness in the Mesos fetcher cache + final String artifactServerPrefix = UUID.randomUUID().toString(); + + return new MesosArtifactServer(artifactServerPrefix, hostname, artifactServerPort, configuration); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java index aa3157f..b93fd29 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java @@ -21,13 +21,20 @@ package org.apache.flink.mesos.runtime.clusterframework.services; import org.apache.flink.configuration.Configuration; import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore; +import org.apache.flink.mesos.util.MesosArtifactServer; + +import akka.actor.ActorSystem; import java.util.concurrent.Executor; /** * {@link MesosServices} implementation for the standalone mode. */ -public class StandaloneMesosServices implements MesosServices { +public class StandaloneMesosServices extends AbstractMesosServices { + + protected StandaloneMesosServices(ActorSystem actorSystem, MesosArtifactServer artifactServer) { + super(actorSystem, artifactServer); + } @Override public MesosWorkerStore createMesosWorkerStore(Configuration configuration, Executor executor) { @@ -36,5 +43,6 @@ public class StandaloneMesosServices implements MesosServices { @Override public void close(boolean cleanup) throws Exception { + super.close(cleanup); } } http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java index 2883e4f..069cb83 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java @@ -21,25 +21,31 @@ package org.apache.flink.mesos.runtime.clusterframework.services; import org.apache.flink.configuration.Configuration; import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore; +import org.apache.flink.mesos.util.MesosArtifactServer; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperSharedCount; import org.apache.flink.runtime.zookeeper.ZooKeeperSharedValue; import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; import org.apache.flink.runtime.zookeeper.ZooKeeperUtilityFactory; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; +import akka.actor.ActorSystem; + import java.util.concurrent.Executor; /** * {@link MesosServices} implementation for the ZooKeeper high availability based mode. */ -public class ZooKeeperMesosServices implements MesosServices { +public class ZooKeeperMesosServices extends AbstractMesosServices { // Factory to create ZooKeeper utility classes private final ZooKeeperUtilityFactory zooKeeperUtilityFactory; - public ZooKeeperMesosServices(ZooKeeperUtilityFactory zooKeeperUtilityFactory) { + public ZooKeeperMesosServices(ActorSystem actorSystem, MesosArtifactServer artifactServer, ZooKeeperUtilityFactory zooKeeperUtilityFactory) { + super(actorSystem, artifactServer); this.zooKeeperUtilityFactory = Preconditions.checkNotNull(zooKeeperUtilityFactory); } @@ -64,7 +70,23 @@ public class ZooKeeperMesosServices implements MesosServices { @Override public void close(boolean cleanup) throws Exception { - // this also closes the underlying CuratorFramework instance - zooKeeperUtilityFactory.close(cleanup); + Throwable exception = null; + + try { + // this also closes the underlying CuratorFramework instance + zooKeeperUtilityFactory.close(cleanup); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + + try { + super.close(cleanup); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + + if (exception != null) { + throw new FlinkException("Could not properly shut down the Mesos services.", exception); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java index 8bfb4d1..ff32486 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java @@ -251,6 +251,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger { containeredParams, Collections.<Protos.Volume>emptyList(), Collections.<ConstraintEvaluator>emptyList(), + "", Option.<String>empty(), Option.<String>empty()); http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java index e81a2de..4bbcb25 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java @@ -21,6 +21,7 @@ package org.apache.flink.mesos.runtime.clusterframework; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices; import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; import org.apache.flink.mesos.scheduler.ConnectionMonitor; import org.apache.flink.mesos.scheduler.LaunchCoordinator; @@ -32,7 +33,7 @@ import org.apache.flink.mesos.scheduler.messages.ReRegistered; import org.apache.flink.mesos.scheduler.messages.Registered; import org.apache.flink.mesos.scheduler.messages.ResourceOffers; import org.apache.flink.mesos.scheduler.messages.StatusUpdate; -import org.apache.flink.mesos.util.MesosArtifactResolver; +import org.apache.flink.mesos.util.MesosArtifactServer; import org.apache.flink.mesos.util.MesosConfiguration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.ApplicationStatus; @@ -159,17 +160,15 @@ public class MesosResourceManagerTest extends TestLogger { FatalErrorHandler fatalErrorHandler, // Mesos specifics - ActorSystem actorSystem, Configuration flinkConfig, + MesosServices mesosServices, MesosConfiguration mesosConfig, - MesosWorkerStore workerStore, MesosTaskManagerParameters taskManagerParameters, - ContainerSpecification taskManagerContainerSpec, - MesosArtifactResolver artifactResolver) { + ContainerSpecification taskManagerContainerSpec) { super(rpcService, resourceManagerEndpointId, resourceId, resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, slotManager, metricRegistry, - jobLeaderIdService, fatalErrorHandler, actorSystem, flinkConfig, mesosConfig, workerStore, - taskManagerParameters, taskManagerContainerSpec, artifactResolver); + jobLeaderIdService, fatalErrorHandler, flinkConfig, mesosServices, mesosConfig, + taskManagerParameters, taskManagerContainerSpec); } @Override @@ -208,6 +207,7 @@ public class MesosResourceManagerTest extends TestLogger { TestingRpcService rpcService; TestingFatalErrorHandler fatalErrorHandler; MockMesosResourceManagerRuntimeServices rmServices; + MockMesosServices mesosServices; // RM ResourceManagerConfiguration rmConfiguration; @@ -242,6 +242,7 @@ public class MesosResourceManagerTest extends TestLogger { rpcService = new TestingRpcService(); fatalErrorHandler = new TestingFatalErrorHandler(); rmServices = new MockMesosResourceManagerRuntimeServices(); + mesosServices = new MockMesosServices(); // TaskExecutor templating ContainerSpecification containerSpecification = new ContainerSpecification(); @@ -249,7 +250,7 @@ public class MesosResourceManagerTest extends TestLogger { new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>()); MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters( 1.0, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams, - Collections.<Protos.Volume>emptyList(), Collections.<ConstraintEvaluator>emptyList(), Option.<String>empty(), + Collections.<Protos.Volume>emptyList(), Collections.<ConstraintEvaluator>emptyList(), "", Option.<String>empty(), Option.<String>empty()); // resource manager @@ -270,13 +271,11 @@ public class MesosResourceManagerTest extends TestLogger { rmServices.jobLeaderIdService, fatalErrorHandler, // Mesos specifics - system, flinkConfig, + mesosServices, rmServices.mesosConfig, - rmServices.workerStore, tmParams, - containerSpecification, - rmServices.artifactResolver + containerSpecification ); // TaskExecutors @@ -341,7 +340,7 @@ public class MesosResourceManagerTest extends TestLogger { public SchedulerDriver schedulerDriver; public MesosConfiguration mesosConfig; public MesosWorkerStore workerStore; - public MesosArtifactResolver artifactResolver; + public MesosArtifactServer artifactServer; MockMesosResourceManagerRuntimeServices() throws Exception { schedulerDriver = mock(SchedulerDriver.class); @@ -354,7 +353,28 @@ public class MesosResourceManagerTest extends TestLogger { workerStore = mock(MesosWorkerStore.class); when(workerStore.getFrameworkID()).thenReturn(Option.<Protos.FrameworkID>empty()); - artifactResolver = mock(MesosArtifactResolver.class); + artifactServer = mock(MesosArtifactServer.class); + } + } + + class MockMesosServices implements MesosServices { + @Override + public MesosWorkerStore createMesosWorkerStore(Configuration configuration, Executor executor) throws Exception { + return rmServices.workerStore; + } + + @Override + public ActorSystem getLocalActorSystem() { + return system; + } + + @Override + public MesosArtifactServer getArtifactServer() { + return rmServices.artifactServer; + } + + @Override + public void close(boolean cleanup) throws Exception { } } http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java old mode 100644 new mode 100755 index 2538f20..1551933 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -23,6 +23,7 @@ import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.clusterframework.BootstrapTools; @@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.concurrent.GuardedBy; +import java.io.IOException; import java.util.concurrent.Callable; import java.util.concurrent.Executor; @@ -92,6 +94,8 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { LOG.info("Starting {}.", getClass().getSimpleName()); try { + installDefaultFileSystem(configuration); + SecurityContext securityContext = installSecurityContext(configuration); securityContext.runSecured(new Callable<Void>() { @@ -115,6 +119,17 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { } } + protected void installDefaultFileSystem(Configuration configuration) throws Exception { + LOG.info("Install default filesystem."); + + try { + FileSystem.setDefaultScheme(configuration); + } catch (IOException e) { + throw new IOException("Error while setting the default " + + "filesystem scheme from configuration.", e); + } + } + protected SecurityContext installSecurityContext(Configuration configuration) throws Exception { LOG.info("Install security context."); @@ -184,9 +199,18 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { } protected void shutDown(boolean cleanupHaData) throws FlinkException { + LOG.info("Stopping {}.", getClass().getSimpleName()); + Throwable exception = null; synchronized (lock) { + + try { + stopClusterComponents(cleanupHaData); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + if (metricRegistry != null) { try { metricRegistry.shutdown(); @@ -244,6 +268,9 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { HeartbeatServices heartbeatServices, MetricRegistry metricRegistry) throws Exception; + protected void stopClusterComponents(boolean cleanupHaData) throws Exception { + } + protected static ClusterConfiguration parseArguments(String[] args) { ParameterTool parameterTool = ParameterTool.fromArgs(args); http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java index a7c6120..e70f6c8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java @@ -110,7 +110,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { } @Override - protected void shutDown(boolean cleanupHaData) throws FlinkException { + protected void stopClusterComponents(boolean cleanupHaData) throws Exception { Throwable exception = null; if (jobManagerRunner != null) { @@ -129,14 +129,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { } } - try { - super.shutDown(cleanupHaData); - } catch (Throwable t) { - exception = ExceptionUtils.firstOrSuppressed(t, exception); - } - if (exception != null) { - throw new FlinkException("Could not properly shut down the session cluster entry point.", exception); + throw new FlinkException("Could not properly shut down the job cluster entry point.", exception); } }