[FLINK-6630] [FLINK-6631] Implement FLIP-6 Mesos cluster entrypoints + 
MesosTaskExecutorRunner

- bin: new entrypoints scripts for flip-6
- ClusterEntrypoint: Refactor the shutdown method
- ClusterEntrypoint: Install default FileSystem (for parity with legacy 
entrypoints)
- ClusterEntrypoint: new MesosJobClusterEntrypoint, 
MesosSessionClusterEntrypoint, MesosEntrypointUtils, MesosTaskExecutorRunner
- MesosServices: enhanced with artifactServer, localActorSystem
- MesosResourceManager: Fallback to old TM params when UNKNOWN resource profile 
is provided
- MesosResourceManager: config setting for taskmanager startup script 
(mesos.resourcemanager.tasks.taskmanager-cmd)
- test: added a 'noop' job graph for testing purposes

This closes #4555.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bbac4a6c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bbac4a6c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bbac4a6c

Branch: refs/heads/master
Commit: bbac4a6c922199db08a5244d0fa1262a5f16d479
Parents: 76f1022
Author: Wright, Eron <eron.wri...@emc.com>
Authored: Wed Aug 16 14:30:24 2017 -0700
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Sat Aug 19 17:25:21 2017 +0200

----------------------------------------------------------------------
 .../mesos-bin/mesos-appmaster-flip6-job.sh      |  47 +++++
 .../mesos-bin/mesos-appmaster-flip6-session.sh  |  47 +++++
 .../mesos-bin/mesos-taskmanager-flip6.sh        |  45 ++++
 .../mesos/entrypoint/MesosEntrypointUtils.java  | 177 ++++++++++++++++
 .../entrypoint/MesosJobClusterEntrypoint.java   | 204 +++++++++++++++++++
 .../MesosSessionClusterEntrypoint.java          | 178 ++++++++++++++++
 .../entrypoint/MesosTaskExecutorRunner.java     | 132 ++++++++++++
 .../clusterframework/LaunchableMesosWorker.java |  30 ++-
 .../MesosApplicationMasterRunner.java           | 131 +-----------
 .../clusterframework/MesosResourceManager.java  |  47 +++--
 .../MesosTaskManagerParameters.java             |  20 +-
 .../services/AbstractMesosServices.java         |  73 +++++++
 .../services/MesosServices.java                 |  17 ++
 .../services/MesosServicesUtils.java            |  28 ++-
 .../services/StandaloneMesosServices.java       |  10 +-
 .../services/ZooKeeperMesosServices.java        |  30 ++-
 .../MesosFlinkResourceManagerTest.java          |   1 +
 .../MesosResourceManagerTest.java               |  48 +++--
 .../runtime/entrypoint/ClusterEntrypoint.java   |  27 +++
 .../entrypoint/JobClusterEntrypoint.java        |  10 +-
 .../entrypoint/SessionClusterEntrypoint.java    |   8 +-
 .../test/runtime/entrypoint/StreamingNoop.java  |  60 ++++++
 22 files changed, 1186 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-job.sh
----------------------------------------------------------------------
diff --git 
a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-job.sh 
b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-job.sh
new file mode 100755
index 0000000..b21670a
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-job.sh
@@ -0,0 +1,47 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+# get Flink config
+. "$bin"/config.sh
+
+if [ "$FLINK_IDENT_STRING" = "" ]; then
+    FLINK_IDENT_STRING="$USER"
+fi
+
+CC_CLASSPATH=`manglePathList 
$(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
+
+log="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-mesos-appmaster-${HOSTNAME}.log"
+log_setting="-Dlog.file="$log" 
-Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties 
-Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"
+
+export FLINK_CONF_DIR
+export FLINK_BIN_DIR
+export FLINK_LIB_DIR
+
+exec $JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting 
org.apache.flink.mesos.entrypoint.MesosJobClusterEntrypoint "$@"
+
+rc=$?
+
+if [[ $rc -ne 0 ]]; then
+    echo "Error while starting the mesos application master. Please check 
${log} for more details."
+fi
+
+exit $rc

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-session.sh
----------------------------------------------------------------------
diff --git 
a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-session.sh 
b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-session.sh
new file mode 100755
index 0000000..b9e0f53
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster-flip6-session.sh
@@ -0,0 +1,47 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+# get Flink config
+. "$bin"/config.sh
+
+if [ "$FLINK_IDENT_STRING" = "" ]; then
+    FLINK_IDENT_STRING="$USER"
+fi
+
+CC_CLASSPATH=`manglePathList 
$(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
+
+log="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-mesos-appmaster-${HOSTNAME}.log"
+log_setting="-Dlog.file="$log" 
-Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties 
-Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"
+
+export FLINK_CONF_DIR
+export FLINK_BIN_DIR
+export FLINK_LIB_DIR
+
+exec $JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting 
org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint "$@"
+
+rc=$?
+
+if [[ $rc -ne 0 ]]; then
+    echo "Error while starting the mesos application master. Please check 
${log} for more details."
+fi
+
+exit $rc

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager-flip6.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager-flip6.sh 
b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager-flip6.sh
new file mode 100755
index 0000000..f251442
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager-flip6.sh
@@ -0,0 +1,45 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+# get Flink config
+. "$bin"/config.sh
+
+CC_CLASSPATH=`manglePathList 
$(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
+
+log=flink-taskmanager.log
+log_setting="-Dlog.file="$log" 
-Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties 
-Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"
+
+# Add precomputed memory JVM options
+if [ -z "${FLINK_ENV_JAVA_OPTS_MEM}" ]; then
+    FLINK_ENV_JAVA_OPTS_MEM=""
+fi
+export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_MEM}"
+
+# Add TaskManager-specific JVM options
+export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"
+
+export FLINK_CONF_DIR
+export FLINK_BIN_DIR
+export FLINK_LIB_DIR
+
+exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} -classpath "$CC_CLASSPATH" 
$log_setting org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner "$@"
+

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
new file mode 100755
index 0000000..0d81ead
--- /dev/null
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.configuration.MesosOptions;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import 
org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
+import 
org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Utils for Mesos entrpoints.
+ */
+public class MesosEntrypointUtils {
+
+       /**
+        * Loads the global configuration and adds the dynamic properties 
parsed from
+        * the given command line.
+        *
+        * @param cmd command line to parse for dynamic properties
+        * @return Global configuration with dynamic properties set
+        * @deprecated replace once FLINK-7269 has been merged
+        */
+       @Deprecated
+       public static Configuration loadConfiguration(CommandLine cmd) {
+
+               // merge the dynamic properties from the command-line
+               Configuration dynamicProperties = 
BootstrapTools.parseDynamicProperties(cmd);
+               GlobalConfiguration.setDynamicProperties(dynamicProperties);
+               Configuration config = GlobalConfiguration.loadConfiguration();
+
+               return config;
+       }
+
+       /**
+        * Loads and validates the Mesos scheduler configuration.
+        * @param flinkConfig the global configuration.
+        * @param hostname the hostname to advertise to the Mesos master.
+        */
+       public static MesosConfiguration 
createMesosSchedulerConfiguration(Configuration flinkConfig, String hostname) {
+
+               Protos.FrameworkInfo.Builder frameworkInfo = 
Protos.FrameworkInfo.newBuilder()
+                       .setHostname(hostname);
+               Protos.Credential.Builder credential = null;
+
+               if (!flinkConfig.contains(MesosOptions.MASTER_URL)) {
+                       throw new 
IllegalConfigurationException(MesosOptions.MASTER_URL.key() + " must be 
configured.");
+               }
+               String masterUrl = 
flinkConfig.getString(MesosOptions.MASTER_URL);
+
+               Duration failoverTimeout = FiniteDuration.apply(
+                       flinkConfig.getInteger(
+                               MesosOptions.FAILOVER_TIMEOUT_SECONDS),
+                               TimeUnit.SECONDS);
+               frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds());
+
+               frameworkInfo.setName(flinkConfig.getString(
+                       MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME));
+
+               frameworkInfo.setRole(flinkConfig.getString(
+                       MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE));
+
+               frameworkInfo.setUser(flinkConfig.getString(
+                       MesosOptions.RESOURCEMANAGER_FRAMEWORK_USER));
+
+               if 
(flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
+                       frameworkInfo.setPrincipal(flinkConfig.getString(
+                               
MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL));
+
+                       credential = Protos.Credential.newBuilder();
+                       credential.setPrincipal(frameworkInfo.getPrincipal());
+
+                       // some environments use a side-channel to communicate 
the secret to Mesos,
+                       // and thus don't set the 'secret' configuration setting
+                       if 
(flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET)) {
+                               credential.setSecret(flinkConfig.getString(
+                                       
MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET));
+                       }
+               }
+
+               MesosConfiguration mesos =
+                       new MesosConfiguration(masterUrl, frameworkInfo, 
scala.Option.apply(credential));
+
+               return mesos;
+       }
+
+       public static MesosTaskManagerParameters 
createTmParameters(Configuration configuration, Logger log) {
+               // TM configuration
+               final MesosTaskManagerParameters taskManagerParameters = 
MesosTaskManagerParameters.create(configuration);
+
+               log.info("TaskManagers will be created with {} task slots",
+                       
taskManagerParameters.containeredParameters().numSlots());
+               log.info("TaskManagers will be started with container size {} 
MB, JVM heap size {} MB, " +
+                               "JVM direct memory limit {} MB, {} cpus",
+                       
taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB(),
+                       
taskManagerParameters.containeredParameters().taskManagerHeapSizeMB(),
+                       
taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(),
+                       taskManagerParameters.cpus());
+
+               return taskManagerParameters;
+       }
+
+       public static ContainerSpecification createContainerSpec(Configuration 
configuration, Configuration dynamicProperties)
+               throws Exception {
+               // generate a container spec which conveys the artifacts/vars 
needed to launch a TM
+               ContainerSpecification spec = new ContainerSpecification();
+
+               // propagate the AM dynamic configuration to the TM
+               spec.getDynamicConfiguration().addAll(dynamicProperties);
+
+               applyOverlays(configuration, spec);
+
+               return spec;
+       }
+
+       /**
+        * Generate a container specification as a TaskManager template.
+        *
+        * <p>This code is extremely Mesos-specific and registers all the 
artifacts that the TaskManager
+        * needs (such as JAR file, config file, ...) and all environment 
variables into a container specification.
+        * The Mesos fetcher then ensures that those artifacts will be copied 
into the task's sandbox directory.
+        * A lightweight HTTP server serves the artifacts to the fetcher.
+        */
+       public static void applyOverlays(
+               Configuration configuration, ContainerSpecification 
containerSpec) throws IOException {
+
+               // create the overlays that will produce the specification
+               CompositeContainerOverlay overlay = new 
CompositeContainerOverlay(
+                       
FlinkDistributionOverlay.newBuilder().fromEnvironment(configuration).build(),
+                       
HadoopConfOverlay.newBuilder().fromEnvironment(configuration).build(),
+                       
HadoopUserOverlay.newBuilder().fromEnvironment(configuration).build(),
+                       
KeytabOverlay.newBuilder().fromEnvironment(configuration).build(),
+                       
Krb5ConfOverlay.newBuilder().fromEnvironment(configuration).build(),
+                       
SSLStoreOverlay.newBuilder().fromEnvironment(configuration).build()
+               );
+
+               // apply the overlays
+               overlay.configure(containerSpec);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
new file mode 100755
index 0000000..890c4a7
--- /dev/null
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Entry point for Mesos per-job clusters.
+ */
+public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
+
+       public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
+
+       // 
------------------------------------------------------------------------
+       //  Command-line options
+       // 
------------------------------------------------------------------------
+
+       private static final Options ALL_OPTIONS;
+
+       static {
+               ALL_OPTIONS =
+                       new Options()
+                               
.addOption(BootstrapTools.newDynamicPropertiesOption());
+       }
+
+       private MesosConfiguration schedulerConfiguration;
+
+       private MesosServices mesosServices;
+
+       private MesosTaskManagerParameters taskManagerParameters;
+
+       private ContainerSpecification taskManagerContainerSpec;
+
+       public MesosJobClusterEntrypoint(Configuration config) {
+               super(config);
+       }
+
+       @Override
+       protected void initializeServices(Configuration config) throws 
Exception {
+               super.initializeServices(config);
+
+               final String hostname = 
config.getString(JobManagerOptions.ADDRESS);
+
+               // Mesos configuration
+               schedulerConfiguration = 
MesosEntrypointUtils.createMesosSchedulerConfiguration(config, hostname);
+
+               // services
+               mesosServices = MesosServicesUtils.createMesosServices(config, 
hostname);
+
+               // TM configuration
+               taskManagerParameters = 
MesosEntrypointUtils.createTmParameters(config, LOG);
+               taskManagerContainerSpec = 
MesosEntrypointUtils.createContainerSpec(config, 
GlobalConfiguration.getDynamicProperties());
+       }
+
+       @Override
+       protected void startClusterComponents(Configuration configuration, 
RpcService rpcService, HighAvailabilityServices highAvailabilityServices, 
BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry 
metricRegistry) throws Exception {
+               super.startClusterComponents(configuration, rpcService, 
highAvailabilityServices, blobServer, heartbeatServices, metricRegistry);
+       }
+
+       @Override
+       protected ResourceManager<?> createResourceManager(
+               Configuration configuration,
+               ResourceID resourceId,
+               RpcService rpcService,
+               HighAvailabilityServices highAvailabilityServices,
+               HeartbeatServices heartbeatServices,
+               MetricRegistry metricRegistry,
+               FatalErrorHandler fatalErrorHandler) throws Exception {
+               final ResourceManagerConfiguration rmConfiguration = 
ResourceManagerConfiguration.fromConfiguration(configuration);
+               final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
+               final ResourceManagerRuntimeServices rmRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
+                       rmServicesConfiguration,
+                       highAvailabilityServices,
+                       rpcService.getScheduledExecutor());
+
+               return new MesosResourceManager(
+                       rpcService,
+                       ResourceManager.RESOURCE_MANAGER_NAME,
+                       resourceId,
+                       rmConfiguration,
+                       highAvailabilityServices,
+                       heartbeatServices,
+                       rmRuntimeServices.getSlotManager(),
+                       metricRegistry,
+                       rmRuntimeServices.getJobLeaderIdService(),
+                       fatalErrorHandler,
+                       configuration,
+                       mesosServices,
+                       schedulerConfiguration,
+                       taskManagerParameters,
+                       taskManagerContainerSpec
+                       );
+       }
+
+       @Override
+       protected JobGraph retrieveJobGraph(Configuration configuration) throws 
FlinkException {
+               String jobGraphFile = 
configuration.getString(JOB_GRAPH_FILE_PATH, "job.graph");
+               File fp = new File(jobGraphFile);
+
+               try (FileInputStream input = new FileInputStream(fp);
+                       ObjectInputStream obInput = new 
ObjectInputStream(input)) {
+
+                       return (JobGraph) obInput.readObject();
+               } catch (FileNotFoundException e) {
+                       throw new FlinkException("Could not find the JobGraph 
file.", e);
+               } catch (ClassNotFoundException | IOException e) {
+                       throw new FlinkException("Could not load the JobGraph 
from file.", e);
+               }
+       }
+
+       @Override
+       protected void stopClusterComponents(boolean cleanupHaData) throws 
Exception {
+               Throwable exception = null;
+
+               try {
+                       super.stopClusterComponents(cleanupHaData);
+               } catch (Throwable t) {
+                       exception = ExceptionUtils.firstOrSuppressed(t, 
exception);
+               }
+
+               if (mesosServices != null) {
+                       try {
+                               mesosServices.close(cleanupHaData);
+                       } catch (Throwable t) {
+                               exception = ExceptionUtils.firstOrSuppressed(t, 
exception);
+                       }
+               }
+
+               if (exception != null) {
+                       throw new FlinkException("Could not properly shut down 
the Mesos job cluster entry point.", exception);
+               }
+       }
+
+       public static void main(String[] args) {
+               // load configuration incl. dynamic properties
+               CommandLineParser parser = new PosixParser();
+               CommandLine cmd;
+               try {
+                       cmd = parser.parse(ALL_OPTIONS, args);
+               }
+               catch (Exception e){
+                       LOG.error("Could not parse the command-line options.", 
e);
+                       System.exit(STARTUP_FAILURE_RETURN_CODE);
+                       return;
+               }
+
+               Configuration configuration = 
MesosEntrypointUtils.loadConfiguration(cmd);
+
+               MesosJobClusterEntrypoint clusterEntrypoint = new 
MesosJobClusterEntrypoint(configuration);
+
+               clusterEntrypoint.startCluster();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
new file mode 100755
index 0000000..67f5899
--- /dev/null
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+
+/**
+ * Entry point for Mesos session clusters.
+ */
+public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint {
+
+       // 
------------------------------------------------------------------------
+       //  Command-line options
+       // 
------------------------------------------------------------------------
+
+       private static final Options ALL_OPTIONS;
+
+       static {
+               ALL_OPTIONS =
+                       new Options()
+                               
.addOption(BootstrapTools.newDynamicPropertiesOption());
+       }
+
+       private MesosConfiguration mesosConfig;
+
+       private MesosServices mesosServices;
+
+       private MesosTaskManagerParameters taskManagerParameters;
+
+       private ContainerSpecification taskManagerContainerSpec;
+
+       public MesosSessionClusterEntrypoint(Configuration config) {
+               super(config);
+       }
+
+       @Override
+       protected void initializeServices(Configuration config) throws 
Exception {
+               super.initializeServices(config);
+
+               final String hostname = 
config.getString(JobManagerOptions.ADDRESS);
+
+               // Mesos configuration
+               mesosConfig = 
MesosEntrypointUtils.createMesosSchedulerConfiguration(config, hostname);
+
+               // services
+               mesosServices = MesosServicesUtils.createMesosServices(config, 
hostname);
+
+               // TM configuration
+               taskManagerParameters = 
MesosEntrypointUtils.createTmParameters(config, LOG);
+               taskManagerContainerSpec = 
MesosEntrypointUtils.createContainerSpec(config, 
GlobalConfiguration.getDynamicProperties());
+       }
+
+       @Override
+       protected void startClusterComponents(Configuration configuration, 
RpcService rpcService, HighAvailabilityServices highAvailabilityServices, 
BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry 
metricRegistry) throws Exception {
+               super.startClusterComponents(configuration, rpcService, 
highAvailabilityServices, blobServer, heartbeatServices, metricRegistry);
+       }
+
+       @Override
+       protected ResourceManager<?> createResourceManager(
+               Configuration configuration,
+               ResourceID resourceId,
+               RpcService rpcService,
+               HighAvailabilityServices highAvailabilityServices,
+               HeartbeatServices heartbeatServices,
+               MetricRegistry metricRegistry,
+               FatalErrorHandler fatalErrorHandler) throws Exception {
+               final ResourceManagerConfiguration rmConfiguration = 
ResourceManagerConfiguration.fromConfiguration(configuration);
+               final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
+               final ResourceManagerRuntimeServices rmRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
+                       rmServicesConfiguration,
+                       highAvailabilityServices,
+                       rpcService.getScheduledExecutor());
+
+               return new MesosResourceManager(
+                       rpcService,
+                       ResourceManager.RESOURCE_MANAGER_NAME,
+                       resourceId,
+                       rmConfiguration,
+                       highAvailabilityServices,
+                       heartbeatServices,
+                       rmRuntimeServices.getSlotManager(),
+                       metricRegistry,
+                       rmRuntimeServices.getJobLeaderIdService(),
+                       fatalErrorHandler,
+                       configuration,
+                       mesosServices,
+                       mesosConfig,
+                       taskManagerParameters,
+                       taskManagerContainerSpec
+                       );
+       }
+
+       @Override
+       protected void stopClusterComponents(boolean cleanupHaData) throws 
Exception {
+               Throwable exception = null;
+
+               try {
+                       super.stopClusterComponents(cleanupHaData);
+               } catch (Throwable t) {
+                       exception = t;
+               }
+
+               if (mesosServices != null) {
+                       try {
+                               mesosServices.close(cleanupHaData);
+                       } catch (Throwable t) {
+                               exception = t;
+                       }
+               }
+
+               if (exception != null) {
+                       throw new FlinkException("Could not properly shut down 
the Mesos session cluster entry point.", exception);
+               }
+       }
+
+       public static void main(String[] args) {
+               // load configuration incl. dynamic properties
+               CommandLineParser parser = new PosixParser();
+               CommandLine cmd;
+               try {
+                       cmd = parser.parse(ALL_OPTIONS, args);
+               }
+               catch (Exception e){
+                       LOG.error("Could not parse the command-line options.", 
e);
+                       System.exit(STARTUP_FAILURE_RETURN_CODE);
+                       return;
+               }
+
+               Configuration configuration = 
MesosEntrypointUtils.loadConfiguration(cmd);
+
+               MesosSessionClusterEntrypoint clusterEntrypoint = new 
MesosSessionClusterEntrypoint(configuration);
+
+               clusterEntrypoint.startCluster();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
new file mode 100644
index 0000000..c4343d2
--- /dev/null
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+/**
+ * The entry point for running a TaskManager in a Mesos container.
+ */
+public class MesosTaskExecutorRunner {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(MesosTaskExecutorRunner.class);
+
+       private static final int INIT_ERROR_EXIT_CODE = 31;
+
+       private static final Options ALL_OPTIONS;
+
+       static {
+               ALL_OPTIONS =
+                       new Options()
+                               
.addOption(BootstrapTools.newDynamicPropertiesOption());
+       }
+
+       public static void main(String[] args) throws Exception {
+               EnvironmentInformation.logEnvironmentInfo(LOG, 
MesosTaskExecutorRunner.class.getSimpleName(), args);
+               SignalHandler.register(LOG);
+               JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
+               // try to parse the command line arguments
+               CommandLineParser parser = new PosixParser();
+               CommandLine cmd = parser.parse(ALL_OPTIONS, args);
+
+               final Configuration configuration;
+               try {
+                       configuration = 
MesosEntrypointUtils.loadConfiguration(cmd);
+               }
+               catch (Throwable t) {
+                       LOG.error("Failed to load the TaskManager configuration 
and dynamic properties.", t);
+                       System.exit(INIT_ERROR_EXIT_CODE);
+                       return;
+               }
+
+               // read the environment variables
+               final Map<String, String> envs = System.getenv();
+               final String tmpDirs = 
envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);
+
+               // configure local directory
+               String flinkTempDirs = 
configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null);
+               if (flinkTempDirs != null) {
+                       LOG.info("Overriding Mesos temporary file directories 
with those " +
+                               "specified in the Flink config: {}", 
flinkTempDirs);
+               }
+               else if (tmpDirs != null) {
+                       LOG.info("Setting directories for temporary files to: 
{}", tmpDirs);
+                       
configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tmpDirs);
+               }
+
+               // configure the default filesystem
+               try {
+                       FileSystem.setDefaultScheme(configuration);
+               } catch (IOException e) {
+                       throw new IOException("Error while setting the default 
" +
+                               "filesystem scheme from configuration.", e);
+               }
+
+               // tell akka to die in case of an error
+               configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, 
true);
+
+               // Infer the resource identifier from the environment variable
+               String containerID = 
Preconditions.checkNotNull(envs.get(MesosConfigKeys.ENV_FLINK_CONTAINER_ID));
+               final ResourceID resourceId = new ResourceID(containerID);
+               LOG.info("ResourceID assigned for this container: {}", 
resourceId);
+
+               // Run the TM in the security context
+               SecurityUtils.SecurityConfiguration sc = new 
SecurityUtils.SecurityConfiguration(configuration);
+               SecurityUtils.install(sc);
+
+               try {
+                       SecurityUtils.getInstalledContext().runSecured(new 
Callable<Integer>() {
+                               @Override
+                               public Integer call() throws Exception {
+                                       
TaskManagerRunner.runTaskManager(configuration, resourceId);
+
+                                       return 0;
+                               }
+                       });
+               }
+               catch (Throwable t) {
+                       LOG.error("Error while starting the TaskManager", t);
+                       System.exit(INIT_ERROR_EXIT_CODE);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index ce7bb9d..2c32507 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.mesos.Utils;
 import org.apache.flink.mesos.scheduler.LaunchableTask;
 import org.apache.flink.mesos.util.MesosArtifactResolver;
+import org.apache.flink.mesos.util.MesosArtifactServer;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -36,6 +37,7 @@ import org.apache.mesos.Protos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -261,12 +263,15 @@ public class LaunchableMesosWorker implements 
LaunchableTask {
                env.addVariables(variable(MesosConfigKeys.ENV_FRAMEWORK_NAME, 
mesosConfiguration.frameworkInfo().getName()));
 
                // build the launch command w/ dynamic application properties
-               Option<String> bootstrapCmdOption = params.bootstrapCommand();
-
-               final String bootstrapCommand = bootstrapCmdOption.isDefined() 
? bootstrapCmdOption.get() + " && " : "";
-               final String launchCommand = bootstrapCommand + 
"$FLINK_HOME/bin/mesos-taskmanager.sh " + 
ContainerSpecification.formatSystemProperties(dynamicProperties);
-
-               cmd.setValue(launchCommand);
+               StringBuilder launchCommand = new StringBuilder();
+               if (params.bootstrapCommand().isDefined()) {
+                       
launchCommand.append(params.bootstrapCommand().get()).append(" && ");
+               }
+               launchCommand
+                       .append(params.command())
+                       .append(" ")
+                       
.append(ContainerSpecification.formatSystemProperties(dynamicProperties));
+               cmd.setValue(launchCommand.toString());
 
                // build the container info
                Protos.ContainerInfo.Builder containerInfo = 
Protos.ContainerInfo.newBuilder();
@@ -312,4 +317,17 @@ public class LaunchableMesosWorker implements 
LaunchableTask {
                        "taskRequest=" + taskRequest +
                        '}';
        }
+
+       /**
+        * Configures an artifact server to serve the artifacts associated with 
a container specification.
+        * @param server the server to configure.
+        * @param container the container with artifacts to serve.
+        * @throws IOException if the artifacts cannot be accessed.
+        */
+       static void configureArtifactServer(MesosArtifactServer server, 
ContainerSpecification container) throws IOException {
+               // serve the artifacts associated with the container environment
+               for (ContainerSpecification.Artifact artifact : 
container.getArtifacts()) {
+                       server.addPath(artifact.source, artifact.dest);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
old mode 100644
new mode 100755
index 7891386..3d16a66
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -21,11 +21,10 @@ package org.apache.flink.mesos.runtime.clusterframework;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.mesos.configuration.MesosOptions;
+import org.apache.flink.mesos.entrypoint.MesosEntrypointUtils;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
@@ -34,13 +33,6 @@ import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
-import 
org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
-import 
org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
-import org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay;
-import org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay;
-import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay;
-import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay;
-import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
@@ -65,7 +57,6 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
-import org.apache.mesos.Protos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,7 +64,6 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.URL;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -81,7 +71,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import scala.Option;
-import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.apache.flink.util.Preconditions.checkState;
@@ -222,7 +211,7 @@ public class MesosApplicationMasterRunner {
                        LOG.info("App Master Hostname to use: {}", 
appMasterHostname);
 
                        // Mesos configuration
-                       final MesosConfiguration mesosConfig = 
createMesosConfig(config, appMasterHostname);
+                       final MesosConfiguration mesosConfig = 
MesosEntrypointUtils.createMesosSchedulerConfiguration(config, 
appMasterHostname);
 
                        // JM configuration
                        int numberProcessors = Hardware.getNumberCPUCores();
@@ -235,19 +224,10 @@ public class MesosApplicationMasterRunner {
                                numberProcessors,
                                new 
ExecutorThreadFactory("mesos-jobmanager-io"));
 
-                       mesosServices = 
MesosServicesUtils.createMesosServices(config);
+                       mesosServices = 
MesosServicesUtils.createMesosServices(config, appMasterHostname);
 
                        // TM configuration
-                       final MesosTaskManagerParameters taskManagerParameters 
= MesosTaskManagerParameters.create(config);
-
-                       LOG.info("TaskManagers will be created with {} task 
slots",
-                               
taskManagerParameters.containeredParameters().numSlots());
-                       LOG.info("TaskManagers will be started with container 
size {} MB, JVM heap size {} MB, " +
-                                       "JVM direct memory limit {} MB, {} 
cpus",
-                               
taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB(),
-                               
taskManagerParameters.containeredParameters().taskManagerHeapSizeMB(),
-                               
taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(),
-                               taskManagerParameters.cpus());
+                       final MesosTaskManagerParameters taskManagerParameters 
= MesosEntrypointUtils.createTmParameters(config, LOG);
 
                        // JM endpoint, which should be explicitly configured 
based on acquired net resources
                        final int listeningPort = 
config.getInteger(JobManagerOptions.PORT);
@@ -268,9 +248,7 @@ public class MesosApplicationMasterRunner {
 
                        // try to start the artifact server
                        LOG.debug("Starting Artifact Server");
-                       final int artifactServerPort = 
config.getInteger(MesosOptions.ARTIFACT_SERVER_PORT);
-                       final String artifactServerPrefix = 
UUID.randomUUID().toString();
-                       artifactServer = new 
MesosArtifactServer(artifactServerPrefix, akkaHostname, artifactServerPort, 
config);
+                       artifactServer = mesosServices.getArtifactServer();
 
                        // ----------------- (3) Generate the configuration for 
the TaskManagers -------------------
 
@@ -287,10 +265,10 @@ public class MesosApplicationMasterRunner {
                        
taskManagerContainerSpec.getDynamicConfiguration().addAll(taskManagerConfig);
 
                        // apply the overlays
-                       applyOverlays(config, taskManagerContainerSpec);
+                       MesosEntrypointUtils.applyOverlays(config, 
taskManagerContainerSpec);
 
                        // configure the artifact server to serve the specified 
artifacts
-                       configureArtifactServer(artifactServer, 
taskManagerContainerSpec);
+                       
LaunchableMesosWorker.configureArtifactServer(artifactServer, 
taskManagerContainerSpec);
 
                        // ----------------- (4) start the actors 
-------------------
 
@@ -386,14 +364,6 @@ public class MesosApplicationMasterRunner {
                                }
                        }
 
-                       if (artifactServer != null) {
-                               try {
-                                       artifactServer.stop();
-                               } catch (Throwable ignored) {
-                                       LOG.error("Failed to stop the artifact 
server", ignored);
-                               }
-                       }
-
                        if (actorSystem != null) {
                                try {
                                        actorSystem.shutdown();
@@ -444,12 +414,6 @@ public class MesosApplicationMasterRunner {
                        }
                }
 
-               try {
-                       artifactServer.stop();
-               } catch (Throwable t) {
-                       LOG.error("Failed to stop the artifact server", t);
-               }
-
                if (highAvailabilityServices != null) {
                        try {
                                highAvailabilityServices.close();
@@ -490,85 +454,4 @@ public class MesosApplicationMasterRunner {
                return MemoryArchivist.class;
        }
 
-       /**
-        * Loads and validates the ResourceManager Mesos configuration from the 
given Flink configuration.
-        */
-       public static MesosConfiguration createMesosConfig(Configuration 
flinkConfig, String hostname) {
-
-               Protos.FrameworkInfo.Builder frameworkInfo = 
Protos.FrameworkInfo.newBuilder()
-                       .setHostname(hostname);
-               Protos.Credential.Builder credential = null;
-
-               if (!flinkConfig.contains(MesosOptions.MASTER_URL)) {
-                       throw new 
IllegalConfigurationException(MesosOptions.MASTER_URL.key() + " must be 
configured.");
-               }
-               String masterUrl = 
flinkConfig.getString(MesosOptions.MASTER_URL);
-
-               Duration failoverTimeout = FiniteDuration.apply(
-                       flinkConfig.getInteger(
-                               MesosOptions.FAILOVER_TIMEOUT_SECONDS),
-                       TimeUnit.SECONDS);
-               frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds());
-
-               frameworkInfo.setName(flinkConfig.getString(
-                       MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME));
-
-               frameworkInfo.setRole(flinkConfig.getString(
-                       MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE));
-
-               frameworkInfo.setUser(flinkConfig.getString(
-                       MesosOptions.RESOURCEMANAGER_FRAMEWORK_USER));
-
-               if 
(flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
-                       frameworkInfo.setPrincipal(flinkConfig.getString(
-                               
MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL));
-
-                       credential = Protos.Credential.newBuilder();
-                       credential.setPrincipal(frameworkInfo.getPrincipal());
-
-                       // some environments use a side-channel to communicate 
the secret to Mesos,
-                       // and thus don't set the 'secret' configuration setting
-                       if 
(flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET)) {
-                               credential.setSecret(flinkConfig.getString(
-                                       
MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET));
-                       }
-               }
-
-               MesosConfiguration mesos =
-                       new MesosConfiguration(masterUrl, frameworkInfo, 
scala.Option.apply(credential));
-
-               return mesos;
-       }
-
-       /**
-        * Generate a container specification as a TaskManager template.
-        *
-        * <p>This code is extremely Mesos-specific and registers all the 
artifacts that the TaskManager
-        * needs (such as JAR file, config file, ...) and all environment 
variables into a container specification.
-        * The Mesos fetcher then ensures that those artifacts will be copied 
into the task's sandbox directory.
-        * A lightweight HTTP server serves the artifacts to the fetcher.
-        */
-       private static void applyOverlays(
-               Configuration globalConfiguration, ContainerSpecification 
containerSpec) throws IOException {
-
-               // create the overlays that will produce the specification
-               CompositeContainerOverlay overlay = new 
CompositeContainerOverlay(
-                       
FlinkDistributionOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
-                       
HadoopConfOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
-                       
HadoopUserOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
-                       
KeytabOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
-                       
Krb5ConfOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
-                       
SSLStoreOverlay.newBuilder().fromEnvironment(globalConfiguration).build()
-               );
-
-               // apply the overlays
-               overlay.configure(containerSpec);
-       }
-
-       private static void configureArtifactServer(MesosArtifactServer server, 
ContainerSpecification container) throws IOException {
-               // serve the artifacts associated with the container environment
-               for (ContainerSpecification.Artifact artifact : 
container.getArtifacts()) {
-                       server.addPath(artifact.source, artifact.dest);
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 736af59..445010b 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -20,6 +20,7 @@ package org.apache.flink.mesos.runtime.clusterframework;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.scheduler.ConnectionMonitor;
 import org.apache.flink.mesos.scheduler.LaunchCoordinator;
@@ -38,7 +39,7 @@ import org.apache.flink.mesos.scheduler.messages.Registered;
 import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
 import org.apache.flink.mesos.scheduler.messages.SlaveLost;
 import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
-import org.apache.flink.mesos.util.MesosArtifactResolver;
+import org.apache.flink.mesos.util.MesosArtifactServer;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
@@ -75,6 +76,7 @@ import org.apache.mesos.SchedulerDriver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -98,17 +100,20 @@ public class MesosResourceManager extends 
ResourceManager<RegisteredMesosWorkerN
        /** The Mesos configuration (master and framework info). */
        private final MesosConfiguration mesosConfig;
 
+       /** The Mesos services needed by the resource manager. */
+       private final MesosServices mesosServices;
+
        /** The TaskManager container parameters (like container memory size). 
*/
        private final MesosTaskManagerParameters taskManagerParameters;
 
        /** Container specification for launching a TM. */
        private final ContainerSpecification taskManagerContainerSpec;
 
-       /** Resolver for HTTP artifacts. */
-       private final MesosArtifactResolver artifactResolver;
+       /** Server for HTTP artifacts. */
+       private final MesosArtifactServer artifactServer;
 
        /** Persistent storage of allocated containers. */
-       private final MesosWorkerStore workerStore;
+       private MesosWorkerStore workerStore;
 
        /** A local actor system for using the helper actors. */
        private final ActorSystem actorSystem;
@@ -145,13 +150,11 @@ public class MesosResourceManager extends 
ResourceManager<RegisteredMesosWorkerN
                        JobLeaderIdService jobLeaderIdService,
                        FatalErrorHandler fatalErrorHandler,
                        // Mesos specifics
-                       ActorSystem actorSystem,
                        Configuration flinkConfig,
+                       MesosServices mesosServices,
                        MesosConfiguration mesosConfig,
-                       MesosWorkerStore workerStore,
                        MesosTaskManagerParameters taskManagerParameters,
-                       ContainerSpecification taskManagerContainerSpec,
-                       MesosArtifactResolver artifactResolver) {
+                       ContainerSpecification taskManagerContainerSpec) {
                super(
                        rpcService,
                        resourceManagerEndpointId,
@@ -164,13 +167,13 @@ public class MesosResourceManager extends 
ResourceManager<RegisteredMesosWorkerN
                        jobLeaderIdService,
                        fatalErrorHandler);
 
-               this.actorSystem = Preconditions.checkNotNull(actorSystem);
+               this.mesosServices = Preconditions.checkNotNull(mesosServices);
+               this.actorSystem = 
Preconditions.checkNotNull(mesosServices.getLocalActorSystem());
 
                this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
                this.mesosConfig = Preconditions.checkNotNull(mesosConfig);
 
-               this.workerStore = Preconditions.checkNotNull(workerStore);
-               this.artifactResolver = 
Preconditions.checkNotNull(artifactResolver);
+               this.artifactServer = 
Preconditions.checkNotNull(mesosServices.getArtifactServer());
 
                this.taskManagerParameters = 
Preconditions.checkNotNull(taskManagerParameters);
                this.taskManagerContainerSpec = 
Preconditions.checkNotNull(taskManagerContainerSpec);
@@ -221,8 +224,9 @@ public class MesosResourceManager extends 
ResourceManager<RegisteredMesosWorkerN
         */
        @Override
        protected void initialize() throws ResourceManagerException {
-               // start the worker store
+               // create and start the worker store
                try {
+                       this.workerStore = 
mesosServices.createMesosWorkerStore(flinkConfig, 
getRpcService().getExecutor());
                        workerStore.start();
                } catch (Exception e) {
                        throw new ResourceManagerException("Unable to 
initialize the worker store.", e);
@@ -266,6 +270,14 @@ public class MesosResourceManager extends 
ResourceManager<RegisteredMesosWorkerN
                        throw new ResourceManagerException("Unable to recover 
Mesos worker state.", e);
                }
 
+               // configure the artifact server to serve the TM container 
artifacts
+               try {
+                       
LaunchableMesosWorker.configureArtifactServer(artifactServer, 
taskManagerContainerSpec);
+               }
+               catch (IOException e) {
+                       throw new ResourceManagerException("Unable to configure 
the artifact server with TaskManager artifacts.", e);
+               }
+
                // begin scheduling
                connectionMonitor.tell(new ConnectionMonitor.Start(), 
selfActor);
                schedulerDriver.start();
@@ -627,20 +639,23 @@ public class MesosResourceManager extends 
ResourceManager<RegisteredMesosWorkerN
                        taskManagerParameters.containerType(),
                        taskManagerParameters.containerImageName(),
                        new ContaineredTaskManagerParameters(
-                               resourceProfile.getMemoryInMB() < 0 ? 
taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB() : 
resourceProfile.getMemoryInMB(),
-                               resourceProfile.getHeapMemoryInMB(),
-                               resourceProfile.getDirectMemoryInMB(),
+                               ResourceProfile.UNKNOWN.equals(resourceProfile) 
? taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB() : 
resourceProfile.getMemoryInMB(),
+                               ResourceProfile.UNKNOWN.equals(resourceProfile) 
? taskManagerParameters.containeredParameters().taskManagerHeapSizeMB() : 
resourceProfile.getHeapMemoryInMB(),
+                               ResourceProfile.UNKNOWN.equals(resourceProfile) 
? 
taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB() 
: resourceProfile.getDirectMemoryInMB(),
                                1,
                                new 
HashMap<>(taskManagerParameters.containeredParameters().taskManagerEnv())),
                        taskManagerParameters.containerVolumes(),
                        taskManagerParameters.constraints(),
+                       taskManagerParameters.command(),
                        taskManagerParameters.bootstrapCommand(),
                        taskManagerParameters.getTaskManagerHostname()
                );
 
+               LOG.debug("LaunchableMesosWorker parameters: {}", params);
+
                LaunchableMesosWorker launchable =
                        new LaunchableMesosWorker(
-                               artifactResolver,
+                               artifactServer,
                                params,
                                taskManagerContainerSpec,
                                taskID,

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index f5a415e..3859913 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -74,6 +74,10 @@ public class MesosTaskManagerParameters {
                key("mesos.resourcemanager.tasks.hostname")
                .noDefaultValue();
 
+       public static final ConfigOption<String> MESOS_TM_CMD =
+               key("mesos.resourcemanager.tasks.taskmanager-cmd")
+               .defaultValue("$FLINK_HOME/bin/mesos-taskmanager.sh"); // 
internal
+
        public static final ConfigOption<String> MESOS_TM_BOOTSTRAP_CMD =
                key("mesos.resourcemanager.tasks.bootstrap-cmd")
                .noDefaultValue();
@@ -107,6 +111,8 @@ public class MesosTaskManagerParameters {
 
        private final List<ConstraintEvaluator> constraints;
 
+       private final String command;
+
        private final Option<String> bootstrapCommand;
 
        private final Option<String> taskManagerHostname;
@@ -118,6 +124,7 @@ public class MesosTaskManagerParameters {
                        ContaineredTaskManagerParameters containeredParameters,
                        List<Protos.Volume> containerVolumes,
                        List<ConstraintEvaluator> constraints,
+                       String command,
                        Option<String> bootstrapCommand,
                        Option<String> taskManagerHostname) {
 
@@ -127,6 +134,7 @@ public class MesosTaskManagerParameters {
                this.containeredParameters = 
Preconditions.checkNotNull(containeredParameters);
                this.containerVolumes = 
Preconditions.checkNotNull(containerVolumes);
                this.constraints = Preconditions.checkNotNull(constraints);
+               this.command = Preconditions.checkNotNull(command);
                this.bootstrapCommand = 
Preconditions.checkNotNull(bootstrapCommand);
                this.taskManagerHostname = 
Preconditions.checkNotNull(taskManagerHostname);
        }
@@ -183,6 +191,13 @@ public class MesosTaskManagerParameters {
        }
 
        /**
+        * Get the command.
+        */
+       public String command() {
+               return command;
+       }
+
+       /**
         * Get the bootstrap command.
         */
        public Option<String> bootstrapCommand() {
@@ -199,6 +214,7 @@ public class MesosTaskManagerParameters {
                        ", containerVolumes=" + containerVolumes +
                        ", constraints=" + constraints +
                        ", taskManagerHostName=" + taskManagerHostname +
+                       ", command=" + command +
                        ", bootstrapCommand=" + bootstrapCommand +
                        '}';
        }
@@ -249,7 +265,8 @@ public class MesosTaskManagerParameters {
                //obtain Task Manager Host Name from the configuration
                Option<String> taskManagerHostname = 
Option.apply(flinkConfig.getString(MESOS_TM_HOSTNAME));
 
-               //obtain bootstrap command from the configuration
+               //obtain command-line from the configuration
+               String tmCommand = flinkConfig.getString(MESOS_TM_CMD);
                Option<String> tmBootstrapCommand = 
Option.apply(flinkConfig.getString(MESOS_TM_BOOTSTRAP_CMD));
 
                return new MesosTaskManagerParameters(
@@ -259,6 +276,7 @@ public class MesosTaskManagerParameters {
                        containeredParameters,
                        containerVolumes,
                        constraints,
+                       tmCommand,
                        tmBootstrapCommand,
                        taskManagerHostname);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/AbstractMesosServices.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/AbstractMesosServices.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/AbstractMesosServices.java
new file mode 100644
index 0000000..e4f4cf7
--- /dev/null
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/AbstractMesosServices.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.services;
+
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import akka.actor.ActorSystem;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An abrstact implementation of {@link MesosServices}.
+ */
+public abstract class AbstractMesosServices implements MesosServices {
+
+       private final ActorSystem actorSystem;
+
+       private final MesosArtifactServer artifactServer;
+
+       protected AbstractMesosServices(ActorSystem actorSystem, 
MesosArtifactServer artifactServer) {
+               this.actorSystem = checkNotNull(actorSystem);
+               this.artifactServer = checkNotNull(artifactServer);
+       }
+
+       @Override
+       public ActorSystem getLocalActorSystem() {
+               return actorSystem;
+       }
+
+       @Override
+       public MesosArtifactServer getArtifactServer() {
+               return artifactServer;
+       }
+
+       @Override
+       public void close(boolean cleanup) throws Exception {
+               Throwable exception = null;
+
+               try {
+                       actorSystem.shutdown();
+               } catch (Throwable t) {
+                       exception = ExceptionUtils.firstOrSuppressed(t, 
exception);
+               }
+
+               try {
+                       artifactServer.stop();
+               } catch (Throwable t) {
+                       exception = ExceptionUtils.firstOrSuppressed(t, 
exception);
+               }
+
+               if (exception != null) {
+                       throw new FlinkException("Could not properly shut down 
the Mesos services.", exception);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java
index 5655bfc..6a64f4f 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java
@@ -20,6 +20,9 @@ package 
org.apache.flink.mesos.runtime.clusterframework.services;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+
+import akka.actor.ActorSystem;
 
 import java.util.concurrent.Executor;
 
@@ -42,6 +45,20 @@ public interface MesosServices {
                Executor executor) throws Exception;
 
        /**
+        * Gets a local {@link ActorSystem} which is used for child actors 
within
+        * {@link 
org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager}.
+        *
+        * @return a reference to an actor system.
+        */
+       ActorSystem getLocalActorSystem();
+
+       /**
+        * Gets the artifact server with which to serve essential resources to 
task managers.
+        * @return a reference to an artifact server.
+        */
+       MesosArtifactServer getArtifactServer();
+
+       /**
         * Closes all state maintained by the mesos services implementation.
         *
         * @param cleanup is true if a cleanup shall be performed

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
index 370a760..c5a8516 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
@@ -20,9 +20,16 @@ package 
org.apache.flink.mesos.runtime.clusterframework.services;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.mesos.configuration.MesosOptions;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.zookeeper.ZooKeeperUtilityFactory;
 
+import akka.actor.ActorSystem;
+
+import java.util.UUID;
+
 /**
  * Utilities for the {@link MesosServices}.
  */
@@ -32,15 +39,21 @@ public class MesosServicesUtils {
         * Creates a {@link MesosServices} instance depending on the high 
availability settings.
         *
         * @param configuration containing the high availability settings
+        * @param hostname the hostname to advertise to remote clients
         * @return a mesos services instance
         * @throws Exception if the mesos services instance could not be created
         */
-       public static MesosServices createMesosServices(Configuration 
configuration) throws Exception {
+       public static MesosServices createMesosServices(Configuration 
configuration, String hostname) throws Exception {
+
+               ActorSystem localActorSystem = 
AkkaUtils.createLocalActorSystem(configuration);
+
+               MesosArtifactServer artifactServer = 
createArtifactServer(configuration, hostname);
+
                HighAvailabilityMode highAvailabilityMode = 
HighAvailabilityMode.fromConfig(configuration);
 
                switch (highAvailabilityMode) {
                        case NONE:
-                               return new StandaloneMesosServices();
+                               return new 
StandaloneMesosServices(localActorSystem, artifactServer);
 
                        case ZOOKEEPER:
                                final String zkMesosRootPath = 
configuration.getString(
@@ -50,10 +63,19 @@ public class MesosServicesUtils {
                                        configuration,
                                        zkMesosRootPath);
 
-                               return new 
ZooKeeperMesosServices(zooKeeperUtilityFactory);
+                               return new 
ZooKeeperMesosServices(localActorSystem, artifactServer, 
zooKeeperUtilityFactory);
 
                        default:
                                throw new Exception("High availability mode " + 
highAvailabilityMode + " is not supported.");
                }
        }
+
+       private static MesosArtifactServer createArtifactServer(Configuration 
configuration, String hostname) throws Exception {
+               final int artifactServerPort = 
configuration.getInteger(MesosOptions.ARTIFACT_SERVER_PORT, 0);
+
+               // a random prefix is affixed to artifact URLs to ensure 
uniqueness in the Mesos fetcher cache
+               final String artifactServerPrefix = 
UUID.randomUUID().toString();
+
+               return new MesosArtifactServer(artifactServerPrefix, hostname, 
artifactServerPort, configuration);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
index aa3157f..b93fd29 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
@@ -21,13 +21,20 @@ package 
org.apache.flink.mesos.runtime.clusterframework.services;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+
+import akka.actor.ActorSystem;
 
 import java.util.concurrent.Executor;
 
 /**
  * {@link MesosServices} implementation for the standalone mode.
  */
-public class StandaloneMesosServices implements MesosServices {
+public class StandaloneMesosServices extends AbstractMesosServices {
+
+       protected StandaloneMesosServices(ActorSystem actorSystem, 
MesosArtifactServer artifactServer) {
+               super(actorSystem, artifactServer);
+       }
 
        @Override
        public MesosWorkerStore createMesosWorkerStore(Configuration 
configuration, Executor executor) {
@@ -36,5 +43,6 @@ public class StandaloneMesosServices implements MesosServices 
{
 
        @Override
        public void close(boolean cleanup) throws Exception {
+               super.close(cleanup);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
index 2883e4f..069cb83 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
@@ -21,25 +21,31 @@ package 
org.apache.flink.mesos.runtime.clusterframework.services;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperSharedCount;
 import org.apache.flink.runtime.zookeeper.ZooKeeperSharedValue;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.runtime.zookeeper.ZooKeeperUtilityFactory;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
+import akka.actor.ActorSystem;
+
 import java.util.concurrent.Executor;
 
 /**
  * {@link MesosServices} implementation for the ZooKeeper high availability 
based mode.
  */
-public class ZooKeeperMesosServices implements MesosServices {
+public class ZooKeeperMesosServices extends AbstractMesosServices {
 
        // Factory to create ZooKeeper utility classes
        private final ZooKeeperUtilityFactory zooKeeperUtilityFactory;
 
-       public ZooKeeperMesosServices(ZooKeeperUtilityFactory 
zooKeeperUtilityFactory) {
+       public ZooKeeperMesosServices(ActorSystem actorSystem, 
MesosArtifactServer artifactServer, ZooKeeperUtilityFactory 
zooKeeperUtilityFactory) {
+               super(actorSystem, artifactServer);
                this.zooKeeperUtilityFactory = 
Preconditions.checkNotNull(zooKeeperUtilityFactory);
        }
 
@@ -64,7 +70,23 @@ public class ZooKeeperMesosServices implements MesosServices 
{
 
        @Override
        public void close(boolean cleanup) throws Exception {
-               // this also closes the underlying CuratorFramework instance
-               zooKeeperUtilityFactory.close(cleanup);
+               Throwable exception = null;
+
+               try {
+                       // this also closes the underlying CuratorFramework 
instance
+                       zooKeeperUtilityFactory.close(cleanup);
+               } catch (Throwable t) {
+                       exception = ExceptionUtils.firstOrSuppressed(t, 
exception);
+               }
+
+               try {
+                       super.close(cleanup);
+               } catch (Throwable t) {
+                       exception = ExceptionUtils.firstOrSuppressed(t, 
exception);
+               }
+
+               if (exception != null) {
+                       throw new FlinkException("Could not properly shut down 
the Mesos services.", exception);
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index 8bfb4d1..ff32486 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -251,6 +251,7 @@ public class MesosFlinkResourceManagerTest extends 
TestLogger {
                                containeredParams,
                                Collections.<Protos.Volume>emptyList(),
                                Collections.<ConstraintEvaluator>emptyList(),
+                               "",
                                Option.<String>empty(),
                                Option.<String>empty());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index e81a2de..4bbcb25 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.mesos.runtime.clusterframework;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.scheduler.ConnectionMonitor;
 import org.apache.flink.mesos.scheduler.LaunchCoordinator;
@@ -32,7 +33,7 @@ import org.apache.flink.mesos.scheduler.messages.ReRegistered;
 import org.apache.flink.mesos.scheduler.messages.Registered;
 import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
 import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
-import org.apache.flink.mesos.util.MesosArtifactResolver;
+import org.apache.flink.mesos.util.MesosArtifactServer;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
@@ -159,17 +160,15 @@ public class MesosResourceManagerTest extends TestLogger {
                        FatalErrorHandler fatalErrorHandler,
 
                        // Mesos specifics
-                       ActorSystem actorSystem,
                        Configuration flinkConfig,
+                       MesosServices mesosServices,
                        MesosConfiguration mesosConfig,
-                       MesosWorkerStore workerStore,
                        MesosTaskManagerParameters taskManagerParameters,
-                       ContainerSpecification taskManagerContainerSpec,
-                       MesosArtifactResolver artifactResolver) {
+                       ContainerSpecification taskManagerContainerSpec) {
                        super(rpcService, resourceManagerEndpointId, 
resourceId, resourceManagerConfiguration,
                                highAvailabilityServices, heartbeatServices, 
slotManager, metricRegistry,
-                               jobLeaderIdService, fatalErrorHandler, 
actorSystem, flinkConfig, mesosConfig, workerStore,
-                               taskManagerParameters, 
taskManagerContainerSpec, artifactResolver);
+                               jobLeaderIdService, fatalErrorHandler, 
flinkConfig, mesosServices, mesosConfig,
+                               taskManagerParameters, 
taskManagerContainerSpec);
                }
 
                @Override
@@ -208,6 +207,7 @@ public class MesosResourceManagerTest extends TestLogger {
                TestingRpcService rpcService;
                TestingFatalErrorHandler fatalErrorHandler;
                MockMesosResourceManagerRuntimeServices rmServices;
+               MockMesosServices mesosServices;
 
                // RM
                ResourceManagerConfiguration rmConfiguration;
@@ -242,6 +242,7 @@ public class MesosResourceManagerTest extends TestLogger {
                        rpcService = new TestingRpcService();
                        fatalErrorHandler = new TestingFatalErrorHandler();
                        rmServices = new 
MockMesosResourceManagerRuntimeServices();
+                       mesosServices = new MockMesosServices();
 
                        // TaskExecutor templating
                        ContainerSpecification containerSpecification = new 
ContainerSpecification();
@@ -249,7 +250,7 @@ public class MesosResourceManagerTest extends TestLogger {
                                new ContaineredTaskManagerParameters(1024, 768, 
256, 4, new HashMap<String, String>());
                        MesosTaskManagerParameters tmParams = new 
MesosTaskManagerParameters(
                                1.0, 
MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), 
containeredParams,
-                               Collections.<Protos.Volume>emptyList(), 
Collections.<ConstraintEvaluator>emptyList(), Option.<String>empty(),
+                               Collections.<Protos.Volume>emptyList(), 
Collections.<ConstraintEvaluator>emptyList(), "", Option.<String>empty(),
                                Option.<String>empty());
 
                        // resource manager
@@ -270,13 +271,11 @@ public class MesosResourceManagerTest extends TestLogger {
                                        rmServices.jobLeaderIdService,
                                        fatalErrorHandler,
                                        // Mesos specifics
-                                       system,
                                        flinkConfig,
+                                       mesosServices,
                                        rmServices.mesosConfig,
-                                       rmServices.workerStore,
                                        tmParams,
-                                       containerSpecification,
-                                       rmServices.artifactResolver
+                                       containerSpecification
                                );
 
                        // TaskExecutors
@@ -341,7 +340,7 @@ public class MesosResourceManagerTest extends TestLogger {
                        public SchedulerDriver schedulerDriver;
                        public MesosConfiguration mesosConfig;
                        public MesosWorkerStore workerStore;
-                       public MesosArtifactResolver artifactResolver;
+                       public MesosArtifactServer artifactServer;
 
                        MockMesosResourceManagerRuntimeServices() throws 
Exception {
                                schedulerDriver = mock(SchedulerDriver.class);
@@ -354,7 +353,28 @@ public class MesosResourceManagerTest extends TestLogger {
                                workerStore = mock(MesosWorkerStore.class);
                                
when(workerStore.getFrameworkID()).thenReturn(Option.<Protos.FrameworkID>empty());
 
-                               artifactResolver = 
mock(MesosArtifactResolver.class);
+                               artifactServer = 
mock(MesosArtifactServer.class);
+                       }
+               }
+
+               class MockMesosServices implements MesosServices {
+                       @Override
+                       public MesosWorkerStore 
createMesosWorkerStore(Configuration configuration, Executor executor) throws 
Exception {
+                               return rmServices.workerStore;
+                       }
+
+                       @Override
+                       public ActorSystem getLocalActorSystem() {
+                               return system;
+                       }
+
+                       @Override
+                       public MesosArtifactServer getArtifactServer() {
+                               return rmServices.artifactServer;
+                       }
+
+                       @Override
+                       public void close(boolean cleanup) throws Exception {
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
old mode 100644
new mode 100755
index 2538f20..1551933
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
@@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.GuardedBy;
 
+import java.io.IOException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 
@@ -92,6 +94,8 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                LOG.info("Starting {}.", getClass().getSimpleName());
 
                try {
+                       installDefaultFileSystem(configuration);
+
                        SecurityContext securityContext = 
installSecurityContext(configuration);
 
                        securityContext.runSecured(new Callable<Void>() {
@@ -115,6 +119,17 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                }
        }
 
+       protected void installDefaultFileSystem(Configuration configuration) 
throws Exception {
+               LOG.info("Install default filesystem.");
+
+               try {
+                       FileSystem.setDefaultScheme(configuration);
+               } catch (IOException e) {
+                       throw new IOException("Error while setting the default 
" +
+                               "filesystem scheme from configuration.", e);
+               }
+       }
+
        protected SecurityContext installSecurityContext(Configuration 
configuration) throws Exception {
                LOG.info("Install security context.");
 
@@ -184,9 +199,18 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
        }
 
        protected void shutDown(boolean cleanupHaData) throws FlinkException {
+               LOG.info("Stopping {}.", getClass().getSimpleName());
+
                Throwable exception = null;
 
                synchronized (lock) {
+
+                       try {
+                               stopClusterComponents(cleanupHaData);
+                       } catch (Throwable t) {
+                               exception = ExceptionUtils.firstOrSuppressed(t, 
exception);
+                       }
+
                        if (metricRegistry != null) {
                                try {
                                        metricRegistry.shutdown();
@@ -244,6 +268,9 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                HeartbeatServices heartbeatServices,
                MetricRegistry metricRegistry) throws Exception;
 
+       protected void stopClusterComponents(boolean cleanupHaData) throws 
Exception {
+       }
+
        protected static ClusterConfiguration parseArguments(String[] args) {
                ParameterTool parameterTool = ParameterTool.fromArgs(args);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index a7c6120..e70f6c8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -110,7 +110,7 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
        }
 
        @Override
-       protected void shutDown(boolean cleanupHaData) throws FlinkException {
+       protected void stopClusterComponents(boolean cleanupHaData) throws 
Exception {
                Throwable exception = null;
 
                if (jobManagerRunner != null) {
@@ -129,14 +129,8 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
                        }
                }
 
-               try {
-                       super.shutDown(cleanupHaData);
-               } catch (Throwable t) {
-                       exception = ExceptionUtils.firstOrSuppressed(t, 
exception);
-               }
-
                if (exception != null) {
-                       throw new FlinkException("Could not properly shut down 
the session cluster entry point.", exception);
+                       throw new FlinkException("Could not properly shut down 
the job cluster entry point.", exception);
                }
        }
 

Reply via email to