[FLINK-1984] Mesos ResourceManager - T1 milestone

Implemented Mesos AppMaster including:
- runners for AppMaster and TaskManager
- MesosFlinkResourceManager as a Mesos framework
- ZK persistent storage for Mesos tasks
- reusable scheduler actors for:
  - offer handling using Netflix Fenzo (LaunchCoordinator)
  - reconciliation (ReconciliationCoordinator)
  - task monitoring (TaskMonitor)
  - connection monitoring (ConnectionMonitor)
- lightweight HTTP server to serve artifacts to the Mesos fetcher 
(ArtifactServer)
- scenario-based logging for:
  - connectivity issues
  - offer handling (receive, process, decline, rescind, accept)
- incorporated FLINK-4152, FLINK-3904, FLINK-4141, FLINK-3675, FLINK-4166


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9b2be05
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9b2be05
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9b2be05

Branch: refs/heads/master
Commit: d9b2be054f7dadf902d74622352e3ec8dfdcd584
Parents: 578e80e
Author: wrighe3 <eron.wri...@emc.com>
Authored: Thu Jul 14 00:12:49 2016 -0700
Committer: Maximilian Michels <m...@apache.org>
Committed: Mon Aug 29 17:27:10 2016 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  76 ++
 flink-dist/pom.xml                              |   7 +-
 flink-mesos/pom.xml                             | 294 ++++++++
 .../main/java/org/apache/flink/mesos/Utils.java |  49 ++
 .../flink/mesos/cli/FlinkMesosSessionCli.java   |  41 ++
 .../clusterframework/LaunchableMesosWorker.java | 187 +++++
 .../MesosApplicationMasterRunner.java           | 600 +++++++++++++++
 .../clusterframework/MesosConfigKeys.java       |  26 +
 .../MesosFlinkResourceManager.java              | 737 +++++++++++++++++++
 .../MesosTaskManagerParameters.java             |  51 ++
 .../MesosTaskManagerRunner.java                 |  99 +++
 .../RegisteredMesosWorkerNode.scala             |  15 +
 .../store/MesosWorkerStore.java                 | 134 ++++
 .../store/StandaloneMesosWorkerStore.java       |  69 ++
 .../store/ZooKeeperMesosWorkerStore.java        | 272 +++++++
 .../flink/mesos/scheduler/LaunchableTask.java   |  24 +
 .../flink/mesos/scheduler/SchedulerProxy.java   |  87 +++
 .../mesos/scheduler/TaskSchedulerBuilder.java   |  16 +
 .../mesos/scheduler/messages/AcceptOffers.java  |  56 ++
 .../mesos/scheduler/messages/Connected.java     |   8 +
 .../mesos/scheduler/messages/Disconnected.java  |  12 +
 .../flink/mesos/scheduler/messages/Error.java   |  24 +
 .../scheduler/messages/OfferRescinded.java      |  26 +
 .../mesos/scheduler/messages/ReRegistered.java  |  30 +
 .../mesos/scheduler/messages/Registered.java    |  39 +
 .../scheduler/messages/ResourceOffers.java      |  30 +
 .../mesos/scheduler/messages/SlaveLost.java     |  26 +
 .../mesos/scheduler/messages/StatusUpdate.java  |  27 +
 .../flink/mesos/util/MesosArtifactServer.java   | 286 +++++++
 .../flink/mesos/util/MesosConfiguration.java    | 108 +++
 .../apache/flink/mesos/util/ZooKeeperUtils.java |  22 +
 flink-mesos/src/main/resources/log4j.properties |  27 +
 .../clusterframework/MesosJobManager.scala      |  66 ++
 .../clusterframework/MesosTaskManager.scala     |  47 ++
 .../mesos/scheduler/ConnectionMonitor.scala     | 108 +++
 .../mesos/scheduler/LaunchCoordinator.scala     | 331 +++++++++
 .../scheduler/ReconciliationCoordinator.scala   | 164 +++++
 .../flink/mesos/scheduler/TaskMonitor.scala     | 240 ++++++
 .../apache/flink/mesos/scheduler/Tasks.scala    |  96 +++
 .../ContaineredJobManager.scala                 | 174 +++++
 .../MesosFlinkResourceManagerTest.java          | 697 ++++++++++++++++++
 .../src/test/resources/log4j-test.properties    |  32 +
 flink-mesos/src/test/resources/logback-test.xml |  37 +
 .../scala/org/apache/flink/mesos/Utils.scala    |  34 +
 .../mesos/scheduler/LaunchCoordinatorTest.scala | 421 +++++++++++
 .../ReconciliationCoordinatorTest.scala         | 214 ++++++
 .../flink/mesos/scheduler/TaskMonitorTest.scala | 237 ++++++
 .../org/apache/flink/runtime/akka/FSMSpec.scala |  40 +
 .../flink/runtime/util/ZooKeeperUtils.java      |   2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   3 +-
 pom.xml                                         |   1 +
 51 files changed, 6446 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 514c730..2fe27e0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -426,6 +426,60 @@ public final class ConfigConstants {
        public static final String YARN_APPLICATION_MASTER_PORT = 
"yarn.application-master.port";
 
 
+       // ------------------------ Mesos Configuration ------------------------
+
+       /**
+        * The maximum number of failed Mesos tasks before entirely stopping
+        * the Mesos session / job on Mesos.
+        *
+        * By default, we take the number of of initially requested tasks.
+        */
+       public static final String MESOS_MAX_FAILED_TASKS = 
"mesos.maximum-failed-tasks";
+
+       /**
+        * The Mesos master URL.
+        *
+        * The value should be in one of the following forms:
+        * <pre>
+        * {@code
+        *     host:port
+        *     zk://host1:port1,host2:port2,.../path
+        *     zk://username:password@host1:port1,host2:port2,.../path
+        *     file:///path/to/file (where file contains one of the above)
+        * }
+        * </pre>
+        *
+        */
+       public static final String MESOS_MASTER_URL = "mesos.master";
+
+       /**
+        * The failover timeout for the Mesos scheduler, after which running 
tasks are automatically shut down.
+        *
+        * The default value is 600 (seconds).
+        */
+       public static final String MESOS_FAILOVER_TIMEOUT_SECONDS = 
"mesos.failover-timeout";
+
+       /**
+        * The config parameter defining the Mesos artifact server port to use.
+        * Setting the port to 0 will let the OS choose an available port.
+        */
+       public static final String MESOS_ARTIFACT_SERVER_PORT_KEY = 
"mesos.resourcemanager.artifactserver.port";
+
+       public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_NAME = 
"mesos.resourcemanager.framework.name";
+
+       public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE = 
"mesos.resourcemanager.framework.role";
+
+       public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL = 
"mesos.resourcemanager.framework.principal";
+
+       public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET = 
"mesos.resourcemanager.framework.secret";
+
+       /**
+        * The cpus to acquire from Mesos.
+        *
+        * By default, we use the number of requested task slots.
+        */
+       public static final String MESOS_RESOURCEMANAGER_TASKS_CPUS = 
"mesos.resourcemanager.tasks.cpus";
+
        // ------------------------ Hadoop Configuration 
------------------------
 
        /**
@@ -736,6 +790,9 @@ public final class ConfigConstants {
        @Deprecated
        public static final String ZOOKEEPER_CHECKPOINT_COUNTER_PATH = 
"recovery.zookeeper.path.checkpoint-counter";
 
+       /** ZooKeeper root path (ZNode) for Mesos workers. */
+       public static final String ZOOKEEPER_MESOS_WORKERS_PATH = 
"recovery.zookeeper.path.mesos-workers";
+
        /** Deprecated in favour of {@link #HA_ZOOKEEPER_SESSION_TIMEOUT}. */
        @Deprecated
        public static final String ZOOKEEPER_SESSION_TIMEOUT = 
"recovery.zookeeper.client.session-timeout";
@@ -983,6 +1040,23 @@ public final class ConfigConstants {
         */
        public static final String DEFAULT_YARN_JOB_MANAGER_PORT = "0";
 
+       // ------ Mesos-Specific Configuration ------
+
+       /** The default failover timeout provided to Mesos (10 mins) */
+       public static final int DEFAULT_MESOS_FAILOVER_TIMEOUT_SECS = 10 * 60;
+
+       /**
+        * The default network port to listen on for the Mesos artifact server.
+        */
+       public static final int DEFAULT_MESOS_ARTIFACT_SERVER_PORT = 0;
+
+       /**
+        * The default Mesos framework name for the ResourceManager to use.
+        */
+       public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_NAME 
= "Flink";
+
+       public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE 
= "*";
+
        // ------------------------ File System Behavior 
------------------------
 
        /**
@@ -1131,6 +1205,8 @@ public final class ConfigConstants {
 
        public static final String DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = 
"/checkpoint-counter";
 
+       public static final String DEFAULT_ZOOKEEPER_MESOS_WORKERS_PATH = 
"/mesos-workers";
+
        public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = 60000;
 
        public static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT = 15000;

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 539ca8e..ec84adc 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -119,8 +119,13 @@ under the License.
                        <artifactId>flink-metrics-jmx</artifactId>
                        <version>${project.version}</version>
                </dependency>
+        
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-mesos_2.10</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
                
-
        </dependencies>
 
        <!-- See main pom.xml for explanation of profiles -->

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/pom.xml
----------------------------------------------------------------------
diff --git a/flink-mesos/pom.xml b/flink-mesos/pom.xml
new file mode 100644
index 0000000..c344ab2
--- /dev/null
+++ b/flink-mesos/pom.xml
@@ -0,0 +1,294 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-parent</artifactId>
+               <version>1.1-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+       
+       <artifactId>flink-mesos_2.10</artifactId>
+       <name>flink-mesos</name>
+       <packaging>jar</packaging>
+
+    <properties>
+        <mesos.version>0.27.1</mesos.version>
+    </properties>
+       
+    <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <exclusions>
+                               <exclusion>
+                                       <artifactId>hadoop-core</artifactId>
+                                       <groupId>org.apache.hadoop</groupId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+               
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-clients_2.10</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>${shading-artifact.name}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+
+               <dependency>
+                       <groupId>com.typesafe.akka</groupId>
+                       
<artifactId>akka-actor_${scala.binary.version}</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.typesafe.akka</groupId>
+                       
<artifactId>akka-remote_${scala.binary.version}</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.typesafe.akka</groupId>
+                       
<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.typesafe.akka</groupId>
+                       
<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+               </dependency>
+
+               <!--<dependency>
+                       <groupId>com.typesafe.akka</groupId>
+                       
<artifactId>akka-camel_${scala.binary.version}</artifactId>
+               </dependency>-->
+
+               <dependency>
+                       <groupId>com.google.guava</groupId>
+                       <artifactId>guava</artifactId>
+                       <version>${guava.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.curator</groupId>
+                       <artifactId>curator-test</artifactId>
+                       <version>${curator.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-shaded-curator-recipes</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <!-- ===================================================
+        Dependencies for Mesos
+               =================================================== -->
+
+               <dependency>
+                       <groupId>org.apache.mesos</groupId>
+                       <artifactId>mesos</artifactId>
+                       <version>${mesos.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.netflix.fenzo</groupId>
+                       <artifactId>fenzo-core</artifactId>
+                       <version>0.9.3</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>tv.cntt</groupId>
+                       <artifactId>netty-router</artifactId>
+                       <version>1.10</version>
+               </dependency>
+
+               <!-- test dependencies -->
+
+               <dependency>
+                       <groupId>org.scalatest</groupId>
+                       
<artifactId>scalatest_${scala.binary.version}</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <!-- Scala Compiler -->
+                       <plugin>
+                               <groupId>net.alchim31.maven</groupId>
+                               <artifactId>scala-maven-plugin</artifactId>
+                               <version>3.1.4</version>
+                               <executions>
+                                       <!-- Run scala compiler in the 
process-resources phase, so that dependencies on
+                                               scala classes can be resolved 
later in the (Java) compile phase -->
+                                       <execution>
+                                               <id>scala-compile-first</id>
+                                               <phase>process-resources</phase>
+                                               <goals>
+                                                       <goal>compile</goal>
+                                               </goals>
+                                       </execution>
+
+                                       <!-- Run scala compiler in the 
process-test-resources phase, so that dependencies on
+                                                scala classes can be resolved 
later in the (Java) test-compile phase -->
+                                       <execution>
+                                               <id>scala-test-compile</id>
+                                               
<phase>process-test-resources</phase>
+                                               <goals>
+                                                       <goal>testCompile</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                               <configuration>
+                                       <jvmArgs>
+                                               <jvmArg>-Xms128m</jvmArg>
+                                               <jvmArg>-Xmx512m</jvmArg>
+                                       </jvmArgs>
+                               </configuration>
+                       </plugin>
+
+                       <!-- Eclipse Integration -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-eclipse-plugin</artifactId>
+                               <version>2.8</version>
+                               <configuration>
+                                       <downloadSources>true</downloadSources>
+                                       <projectnatures>
+                                               
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+                                               
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+                                       </projectnatures>
+                                       <buildcommands>
+                                               
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+                                       </buildcommands>
+                                       <classpathContainers>
+                                               
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+                                               
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+                                       </classpathContainers>
+                                       <excludes>
+                                               
<exclude>org.scala-lang:scala-library</exclude>
+                                               
<exclude>org.scala-lang:scala-compiler</exclude>
+                                       </excludes>
+                                       <sourceIncludes>
+                                               
<sourceInclude>**/*.scala</sourceInclude>
+                                               
<sourceInclude>**/*.java</sourceInclude>
+                                       </sourceIncludes>
+                               </configuration>
+                       </plugin>
+
+                       <!-- Adding scala source directories to build path -->
+                       <plugin>
+                               <groupId>org.codehaus.mojo</groupId>
+                               
<artifactId>build-helper-maven-plugin</artifactId>
+                               <version>1.7</version>
+                               <executions>
+                                       <!-- Add src/main/scala to eclipse 
build path -->
+                                       <execution>
+                                               <id>add-source</id>
+                                               <phase>generate-sources</phase>
+                                               <goals>
+                                                       <goal>add-source</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sources>
+                                                               
<source>src/main/scala</source>
+                                                       </sources>
+                                               </configuration>
+                                       </execution>
+                                       <!-- Add src/test/scala to eclipse 
build path -->
+                                       <execution>
+                                               <id>add-test-source</id>
+                                               
<phase>generate-test-sources</phase>
+                                               <goals>
+                                                       
<goal>add-test-source</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sources>
+                                                               
<source>src/test/scala</source>
+                                                       </sources>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <!-- Scala Code Style, most of the configuration done 
via plugin management -->
+                       <plugin>
+                               <groupId>org.scalastyle</groupId>
+                               <artifactId>scalastyle-maven-plugin</artifactId>
+                               <configuration>
+                                       
<configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation>
+                               </configuration>
+                       </plugin>
+
+                       <!-- Relocate curator -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>shade-flink</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <artifactSet>
+                                                               <includes 
combine.children="append">
+                                                                       
<include>org.apache.flink:flink-shaded-curator-recipes</include>
+                                                               </includes>
+                                                       </artifactSet>
+                                                       <relocations 
combine.children="append">
+                                                               <relocation>
+                                                                       
<pattern>org.apache.curator</pattern>
+                                                                       
<shadedPattern>org.apache.flink.shaded.org.apache.curator</shadedPattern>
+                                                               </relocation>
+                                                       </relocations>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java 
b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
new file mode 100644
index 0000000..2509465
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
@@ -0,0 +1,49 @@
+package org.apache.flink.mesos;
+
+import org.apache.mesos.Protos;
+
+import java.net.URL;
+import java.util.Arrays;
+
+public class Utils {
+       /**
+        * Construct a Mesos environment variable.
+     */
+       public static Protos.Environment.Variable variable(String name, String 
value) {
+               return Protos.Environment.Variable.newBuilder()
+                       .setName(name)
+                       .setValue(value)
+                       .build();
+       }
+
+       /**
+        * Construct a Mesos URI.
+     */
+       public static Protos.CommandInfo.URI uri(URL url, boolean cacheable) {
+               return Protos.CommandInfo.URI.newBuilder()
+                       .setValue(url.toExternalForm())
+                       .setExtract(false)
+                       .setCache(cacheable)
+                       .build();
+       }
+
+       public static Protos.Resource scalar(String name, double value) {
+               return Protos.Resource.newBuilder()
+                       .setName(name)
+                       .setType(Protos.Value.Type.SCALAR)
+                       
.setScalar(Protos.Value.Scalar.newBuilder().setValue(value))
+                       .build();
+       }
+
+       public static Protos.Value.Range range(long begin, long end) {
+               return 
Protos.Value.Range.newBuilder().setBegin(begin).setEnd(end).build();
+       }
+
+       public static Protos.Resource ranges(String name, Protos.Value.Range... 
ranges) {
+               return Protos.Resource.newBuilder()
+                       .setName(name)
+                       .setType(Protos.Value.Type.RANGES)
+                       
.setRanges(Protos.Value.Ranges.newBuilder().addAllRange(Arrays.asList(ranges)).build())
+                       .build();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
new file mode 100644
index 0000000..b767344
--- /dev/null
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
@@ -0,0 +1,41 @@
+package org.apache.flink.mesos.cli;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class FlinkMesosSessionCli {
+
+       private static final ObjectMapper mapper = new ObjectMapper();
+
+       public static Configuration decodeDynamicProperties(String 
dynamicPropertiesEncoded) {
+               try {
+                       Configuration configuration = new Configuration();
+                       if(dynamicPropertiesEncoded != null) {
+                               TypeReference<Map<String, String>> typeRef = 
new TypeReference<Map<String, String>>() {};
+                               Map<String,String> props = 
mapper.readValue(dynamicPropertiesEncoded, typeRef);
+                               for (Map.Entry<String, String> property : 
props.entrySet()) {
+                                       
configuration.setString(property.getKey(), property.getValue());
+                               }
+                       }
+                       return configuration;
+               }
+               catch(IOException ex) {
+                       throw new IllegalArgumentException("unreadable encoded 
properties", ex);
+               }
+       }
+
+       public static String encodeDynamicProperties(Configuration 
configuration) {
+               try {
+                       String dynamicPropertiesEncoded = 
mapper.writeValueAsString(configuration.toMap());
+                       return dynamicPropertiesEncoded;
+               }
+               catch (JsonProcessingException ex) {
+                       throw new IllegalArgumentException("unwritable 
properties", ex);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
new file mode 100644
index 0000000..8abd79a
--- /dev/null
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -0,0 +1,187 @@
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.TaskAssignmentResult;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.VMTaskFitnessCalculator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.mesos.Protos;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.mesos.Utils.variable;
+import static org.apache.flink.mesos.Utils.range;
+import static org.apache.flink.mesos.Utils.ranges;
+import static org.apache.flink.mesos.Utils.scalar;
+
+/**
+ * Specifies how to launch a Mesos worker.
+ */
+public class LaunchableMesosWorker implements LaunchableTask {
+
+       /**
+        * The set of configuration keys to be dynamically configured with a 
port allocated from Mesos.
+        */
+       private static String[] TM_PORT_KEYS = {
+               "taskmanager.rpc.port",
+               "taskmanager.data.port" };
+
+       private final MesosTaskManagerParameters params;
+       private final Protos.TaskInfo.Builder template;
+       private final Protos.TaskID taskID;
+       private final Request taskRequest;
+
+       /**
+        * Construct a launchable Mesos worker.
+        * @param params the TM parameters such as memory, cpu to acquire.
+        * @param template a template for the TaskInfo to be constructed at 
launch time.
+        * @param taskID the taskID for this worker.
+     */
+       public LaunchableMesosWorker(MesosTaskManagerParameters params, 
Protos.TaskInfo.Builder template, Protos.TaskID taskID) {
+               this.params = params;
+               this.template = template;
+               this.taskID = taskID;
+               this.taskRequest = new Request();
+       }
+
+       public Protos.TaskID taskID() {
+               return taskID;
+       }
+
+       @Override
+       public TaskRequest taskRequest() {
+               return taskRequest;
+       }
+
+       class Request implements TaskRequest {
+               private final AtomicReference<TaskRequest.AssignedResources> 
assignedResources = new AtomicReference<>();
+
+               @Override
+               public String getId() {
+                       return taskID.getValue();
+               }
+
+               @Override
+               public String taskGroupName() {
+                       return "";
+               }
+
+               @Override
+               public double getCPUs() {
+                       return params.cpus();
+               }
+
+               @Override
+               public double getMemory() {
+                       return 
params.containeredParameters().taskManagerTotalMemoryMB();
+               }
+
+               @Override
+               public double getNetworkMbps() {
+                       return 0.0;
+               }
+
+               @Override
+               public double getDisk() {
+                       return 0.0;
+               }
+
+               @Override
+               public int getPorts() {
+                       return TM_PORT_KEYS.length;
+               }
+
+               @Override
+               public Map<String, NamedResourceSetRequest> 
getCustomNamedResources() {
+                       return Collections.emptyMap();
+               }
+
+               @Override
+               public List<? extends ConstraintEvaluator> getHardConstraints() 
{
+                       return null;
+               }
+
+               @Override
+               public List<? extends VMTaskFitnessCalculator> 
getSoftConstraints() {
+                       return null;
+               }
+
+               @Override
+               public void setAssignedResources(AssignedResources 
assignedResources) {
+                       this.assignedResources.set(assignedResources);
+               }
+
+               @Override
+               public AssignedResources getAssignedResources() {
+                       return assignedResources.get();
+               }
+
+               @Override
+               public String toString() {
+                       return "Request{" +
+                               "cpus=" + getCPUs() +
+                               "memory=" + getMemory() +
+                               '}';
+               }
+       }
+
+       /**
+        * Construct the TaskInfo needed to launch the worker.
+        * @param slaveId the assigned slave.
+        * @param assignment the assignment details.
+     * @return a fully-baked TaskInfo.
+     */
+       @Override
+       public Protos.TaskInfo launch(Protos.SlaveID slaveId, 
TaskAssignmentResult assignment) {
+
+               final Configuration dynamicProperties = new Configuration();
+
+               // specialize the TaskInfo template with assigned resources, 
environment variables, etc
+               final Protos.TaskInfo.Builder taskInfo = template
+                       .clone()
+                       .setSlaveId(slaveId)
+                       .setTaskId(taskID)
+                       .setName(taskID.getValue())
+                       .addResources(scalar("cpus", 
assignment.getRequest().getCPUs()))
+                       .addResources(scalar("mem", 
assignment.getRequest().getMemory()));
+               //.addResources(scalar("disk", 
assignment.getRequest.getDisk).setRole("Flink"))
+
+               // use the assigned ports for the TM
+               if (assignment.getAssignedPorts().size() != 
TM_PORT_KEYS.length) {
+                       throw new IllegalArgumentException("unsufficient # of 
ports assigned");
+               }
+               for (int i = 0; i < TM_PORT_KEYS.length; i++) {
+                       int port = assignment.getAssignedPorts().get(i);
+                       String key = TM_PORT_KEYS[i];
+                       taskInfo.addResources(ranges("ports", range(port, 
port)));
+                       dynamicProperties.setInteger(key, port);
+               }
+
+               // finalize environment variables
+               final Protos.Environment.Builder environmentBuilder = 
taskInfo.getCommandBuilder().getEnvironmentBuilder();
+
+               // propagate the Mesos task ID to the TM
+               environmentBuilder
+                       
.addVariables(variable(MesosConfigKeys.ENV_FLINK_CONTAINER_ID, 
taskInfo.getTaskId().getValue()));
+
+               // propagate the dynamic configuration properties to the TM
+               String dynamicPropertiesEncoded = 
FlinkMesosSessionCli.encodeDynamicProperties(dynamicProperties);
+               environmentBuilder
+                       
.addVariables(variable(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES, 
dynamicPropertiesEncoded));
+
+               return taskInfo.build();
+       }
+
+       @Override
+       public String toString() {
+               return "LaunchableMesosWorker{" +
+                       "taskID=" + taskID +
+                       "taskRequest=" + taskRequest +
+                       '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
new file mode 100644
index 0000000..30f2258
--- /dev/null
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -0,0 +1,600 @@
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+       /** Logger */
+       protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+       /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+        * before they quit */
+       private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+       /** The process environment variables */
+       private static final Map<String, String> ENV = System.getenv();
+
+       /** The exit code returned if the initialization of the application 
master failed */
+       private static final int INIT_ERROR_EXIT_CODE = 31;
+
+       /** The exit code returned if the process exits because a critical 
actor died */
+       private static final int ACTOR_DIED_EXIT_CODE = 32;
+
+       // 
------------------------------------------------------------------------
+       //  Program entry point
+       // 
------------------------------------------------------------------------
+
+       /**
+        * The entry point for the Mesos AppMaster.
+        *
+        * @param args The command line arguments.
+        */
+       public static void main(String[] args) {
+               EnvironmentInformation.logEnvironmentInfo(LOG, "Mesos 
AppMaster", args);
+               SignalHandler.register(LOG);
+
+               // run and exit with the proper return code
+               int returnCode = new MesosApplicationMasterRunner().run(args);
+               System.exit(returnCode);
+       }
+
+       /**
+        * The instance entry point for the Mesos AppMaster. Obtains user group
+        * information and calls the main work method {@link #runPrivileged()} 
as a
+        * privileged action.
+        *
+        * @param args The command line arguments.
+        * @return The process exit code.
+        */
+       protected int run(String[] args) {
+               try {
+                       LOG.debug("All environment variables: {}", ENV);
+
+                       final UserGroupInformation currentUser;
+                       try {
+                               currentUser = 
UserGroupInformation.getCurrentUser();
+                       } catch (Throwable t) {
+                               throw new Exception("Cannot access 
UserGroupInformation information for current user", t);
+                       }
+
+                       LOG.info("Running Flink as user {}", 
currentUser.getShortUserName());
+
+                       // run the actual work in a secured privileged action
+                       return currentUser.doAs(new PrivilegedAction<Integer>() 
{
+                               @Override
+                               public Integer run() {
+                                       return runPrivileged();
+                               }
+                       });
+               }
+               catch (Throwable t) {
+                       // make sure that everything whatever ends up in the log
+                       LOG.error("Mesos AppMaster initialization failed", t);
+                       return INIT_ERROR_EXIT_CODE;
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Core work method
+       // 
------------------------------------------------------------------------
+
+       /**
+        * The main work method, must run as a privileged action.
+        *
+        * @return The return code for the Java process.
+        */
+       protected int runPrivileged() {
+
+               ActorSystem actorSystem = null;
+               WebMonitor webMonitor = null;
+               MesosArtifactServer artifactServer = null;
+
+               try {
+                       // ------- (1) load and parse / validate all 
configurations -------
+
+                       // loading all config values here has the advantage 
that the program fails fast, if any
+                       // configuration problem occurs
+
+                       final String workingDir = 
ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX);
+                       require(workingDir != null, "Sandbox directory variable 
(%s) not set", MesosConfigKeys.ENV_MESOS_SANDBOX);
+
+                       final String sessionID = 
ENV.get(MesosConfigKeys.ENV_SESSION_ID);
+                       require(sessionID != null, "Session ID (%s) not set", 
MesosConfigKeys.ENV_SESSION_ID);
+
+                       // Note that we use the "appMasterHostname" given by 
the system, to make sure
+                       // we use the hostnames consistently throughout akka.
+                       // for akka "localhost" and "localhost.localdomain" are 
different actors.
+                       final String appMasterHostname = 
InetAddress.getLocalHost().getHostName();
+
+                       // Flink configuration
+                       final Configuration dynamicProperties =
+                               
FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
+                       LOG.debug("Mesos dynamic properties: {}", 
dynamicProperties);
+
+                       final Configuration config = 
createConfiguration(workingDir, dynamicProperties);
+
+                       // Mesos configuration
+                       final MesosConfiguration mesosConfig = 
createMesosConfig(config, appMasterHostname);
+
+                       // environment values related to TM
+                       final int taskManagerContainerMemory;
+                       final int numInitialTaskManagers;
+                       final int slotsPerTaskManager;
+
+                       try {
+                               taskManagerContainerMemory = 
Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_MEMORY));
+                       } catch (NumberFormatException e) {
+                               throw new RuntimeException("Invalid value for " 
+ MesosConfigKeys.ENV_TM_MEMORY + " : "
+                                       + e.getMessage());
+                       }
+                       try {
+                               numInitialTaskManagers = 
Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_COUNT));
+                       } catch (NumberFormatException e) {
+                               throw new RuntimeException("Invalid value for " 
+ MesosConfigKeys.ENV_TM_COUNT + " : "
+                                       + e.getMessage());
+                       }
+                       try {
+                               slotsPerTaskManager = 
Integer.parseInt(ENV.get(MesosConfigKeys.ENV_SLOTS));
+                       } catch (NumberFormatException e) {
+                               throw new RuntimeException("Invalid value for " 
+ MesosConfigKeys.ENV_SLOTS + " : "
+                                       + e.getMessage());
+                       }
+
+                       final ContaineredTaskManagerParameters 
containeredParameters =
+                               ContaineredTaskManagerParameters.create(config, 
taskManagerContainerMemory, slotsPerTaskManager);
+
+                       final MesosTaskManagerParameters taskManagerParameters =
+                               MesosTaskManagerParameters.create(config, 
containeredParameters);
+
+                       LOG.info("TaskManagers will be created with {} task 
slots",
+                               
taskManagerParameters.containeredParameters().numSlots());
+                       LOG.info("TaskManagers will be started with container 
size {} MB, JVM heap size {} MB, " +
+                                       "JVM direct memory limit {} MB, {} 
cpus",
+                               
taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB(),
+                               
taskManagerParameters.containeredParameters().taskManagerHeapSizeMB(),
+                               
taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(),
+                               taskManagerParameters.cpus());
+
+                       // JM endpoint, which should be explicitly configured 
by the dispatcher (based on acquired net resources)
+                       final int listeningPort = 
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+                               ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
+                       require(listeningPort >= 0 && listeningPort <= 65536, 
"Config parameter \"" +
+                               ConfigConstants.JOB_MANAGER_IPC_PORT_KEY + "\" 
is invalid, it must be between 0 and 65536");
+
+                       // ----------------- (2) start the actor system 
-------------------
+
+                       // try to start the actor system, JobManager and 
JobManager actor system
+                       // using the configured address and ports
+                       actorSystem = BootstrapTools.startActorSystem(config, 
appMasterHostname, listeningPort, LOG);
+
+                       final String akkaHostname = 
AkkaUtils.getAddress(actorSystem).host().get();
+                       final int akkaPort = (Integer) 
AkkaUtils.getAddress(actorSystem).port().get();
+
+                       LOG.info("Actor system bound to hostname {}.", 
akkaHostname);
+
+                       // try to start the artifact server
+                       LOG.debug("Starting Artifact Server");
+                       final int artifactServerPort = 
config.getInteger(ConfigConstants.MESOS_ARTIFACT_SERVER_PORT_KEY,
+                               
ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_PORT);
+                       artifactServer = new MesosArtifactServer(sessionID, 
akkaHostname, artifactServerPort);
+
+                       // ----------------- (3) Generate the configuration for 
the TaskManagers -------------------
+
+                       final Configuration taskManagerConfig = 
BootstrapTools.generateTaskManagerConfiguration(
+                               config, akkaHostname, akkaPort, 
slotsPerTaskManager, TASKMANAGER_REGISTRATION_TIMEOUT);
+                       LOG.debug("TaskManager configuration: {}", 
taskManagerConfig);
+
+                       final Protos.TaskInfo.Builder taskManagerContext = 
createTaskManagerContext(
+                               config, mesosConfig, ENV,
+                               taskManagerParameters, taskManagerConfig,
+                               workingDir, getTaskManagerClass(), 
artifactServer, LOG);
+
+                       // ----------------- (4) start the actors 
-------------------
+
+                       // 1) JobManager & Archive (in non-HA case, the leader 
service takes this)
+                       // 2) Web Monitor (we need its port to register)
+                       // 3) Resource Master for Mesos
+                       // 4) Process reapers for the JobManager and Resource 
Master
+
+                       // 1: the JobManager
+                       LOG.debug("Starting JobManager actor");
+
+                       // we start the JobManager with its standard name
+                       ActorRef jobManager = JobManager.startJobManagerActors(
+                               config, actorSystem,
+                               new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
+                               scala.Option.<String>empty(),
+                               getJobManagerClass(),
+                               getArchivistClass())._1();
+
+
+                       // 2: the web monitor
+                       LOG.debug("Starting Web Frontend");
+
+                       webMonitor = 
BootstrapTools.startWebMonitorIfConfigured(config, actorSystem, jobManager, 
LOG);
+                       if(webMonitor != null) {
+                               final URL webMonitorURL = new URL("http", 
appMasterHostname, webMonitor.getServerPort(), "/");
+                               
mesosConfig.frameworkInfo().setWebuiUrl(webMonitorURL.toExternalForm());
+                       }
+
+                       // 3: Flink's Mesos ResourceManager
+                       LOG.debug("Starting Mesos Flink Resource Manager");
+
+                       // create the worker store to persist task information 
across restarts
+                       MesosWorkerStore workerStore = 
createWorkerStore(config);
+
+                       // we need the leader retrieval service here to be 
informed of new
+                       // leader session IDs, even though there can be only 
one leader ever
+                       LeaderRetrievalService leaderRetriever =
+                               
LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager);
+
+                       Props resourceMasterProps = 
MesosFlinkResourceManager.createActorProps(
+                               getResourceManagerClass(),
+                               config,
+                               mesosConfig,
+                               workerStore,
+                               leaderRetriever,
+                               taskManagerParameters,
+                               taskManagerContext,
+                               numInitialTaskManagers,
+                               LOG);
+
+                       ActorRef resourceMaster = 
actorSystem.actorOf(resourceMasterProps, "Mesos_Resource_Master");
+
+
+                       // 4: Process reapers
+                       // The process reapers ensure that upon unexpected 
actor death, the process exits
+                       // and does not stay lingering around unresponsive
+
+                       LOG.debug("Starting process reapers for JobManager");
+
+                       actorSystem.actorOf(
+                               Props.create(ProcessReaper.class, 
resourceMaster, LOG, ACTOR_DIED_EXIT_CODE),
+                               "Mesos_Resource_Master_Process_Reaper");
+
+                       actorSystem.actorOf(
+                               Props.create(ProcessReaper.class, jobManager, 
LOG, ACTOR_DIED_EXIT_CODE),
+                               "JobManager_Process_Reaper");
+               }
+               catch (Throwable t) {
+                       // make sure that everything whatever ends up in the log
+                       LOG.error("Mesos JobManager initialization failed", t);
+
+                       if (actorSystem != null) {
+                               try {
+                                       actorSystem.shutdown();
+                               } catch (Throwable tt) {
+                                       LOG.error("Error shutting down actor 
system", tt);
+                               }
+                       }
+
+                       if (webMonitor != null) {
+                               try {
+                                       webMonitor.stop();
+                               } catch (Throwable ignored) {
+                                       LOG.warn("Failed to stop the web 
frontend", ignored);
+                               }
+                       }
+
+                       if(artifactServer != null) {
+                               try {
+                                       artifactServer.stop();
+                               } catch (Throwable ignored) {
+                                       LOG.error("Failed to stop the artifact 
server", ignored);
+                               }
+                       }
+
+                       return INIT_ERROR_EXIT_CODE;
+               }
+
+               // everything started, we can wait until all is done or the 
process is killed
+               LOG.info("Mesos JobManager started");
+
+               // wait until everything is done
+               actorSystem.awaitTermination();
+
+               // if we get here, everything work out jolly all right, and we 
even exited smoothly
+               if (webMonitor != null) {
+                       try {
+                               webMonitor.stop();
+                       } catch (Throwable t) {
+                               LOG.error("Failed to stop the web frontend", t);
+                       }
+               }
+
+               try {
+                       artifactServer.stop();
+               } catch (Throwable t) {
+                       LOG.error("Failed to stop the artifact server", t);
+               }
+
+               return 0;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  For testing, this allows to override the actor classes used for
+       //  JobManager and the archive of completed jobs
+       // 
------------------------------------------------------------------------
+
+       protected Class<? extends MesosFlinkResourceManager> 
getResourceManagerClass() {
+               return MesosFlinkResourceManager.class;
+       }
+
+       protected Class<? extends JobManager> getJobManagerClass() {
+               return MesosJobManager.class;
+       }
+
+       protected Class<? extends MemoryArchivist> getArchivistClass() {
+               return MemoryArchivist.class;
+       }
+
+       protected Class<? extends TaskManager> getTaskManagerClass() {
+               return MesosTaskManager.class;
+       }
+
+       /**
+        * Validates a condition, throwing a RuntimeException if the condition 
is violated.
+        *
+        * @param condition The condition.
+        * @param message The message for the runtime exception, with format 
variables as defined by
+        *                {@link String#format(String, Object...)}.
+        * @param values The format arguments.
+        */
+       private static void require(boolean condition, String message, 
Object... values) {
+               if (!condition) {
+                       throw new RuntimeException(String.format(message, 
values));
+               }
+       }
+
+       /**
+        *
+        * @param baseDirectory
+        * @param additional
+        *
+        * @return The configuration to be used by the TaskManagers.
+        */
+       @SuppressWarnings("deprecation")
+       private static Configuration createConfiguration(String baseDirectory, 
Configuration additional) {
+               LOG.info("Loading config from directory " + baseDirectory);
+
+               Configuration configuration = 
GlobalConfiguration.loadConfiguration(baseDirectory);
+
+               
configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, baseDirectory);
+
+               // add dynamic properties to JobManager configuration.
+               configuration.addAll(additional);
+
+               return configuration;
+       }
+
+       /**
+        * Loads and validates the ResourceManager Mesos configuration from the 
given Flink configuration.
+        */
+       public static MesosConfiguration createMesosConfig(Configuration 
flinkConfig, String hostname) {
+
+               Protos.FrameworkInfo.Builder frameworkInfo = 
Protos.FrameworkInfo.newBuilder()
+                       .setUser("")
+                       .setHostname(hostname);
+               Protos.Credential.Builder credential = null;
+
+               if(!flinkConfig.containsKey(ConfigConstants.MESOS_MASTER_URL)) {
+                       throw new 
IllegalConfigurationException(ConfigConstants.MESOS_MASTER_URL + " must be 
configured.");
+               }
+               String masterUrl = 
flinkConfig.getString(ConfigConstants.MESOS_MASTER_URL, null);
+
+               Duration failoverTimeout = FiniteDuration.apply(
+                       flinkConfig.getInteger(
+                               ConfigConstants.MESOS_FAILOVER_TIMEOUT_SECONDS,
+                               
ConfigConstants.DEFAULT_MESOS_FAILOVER_TIMEOUT_SECS),
+                       TimeUnit.SECONDS);
+               frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds());
+
+               frameworkInfo.setName(flinkConfig.getString(
+                       ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_NAME,
+                       
ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_NAME));
+
+               frameworkInfo.setRole(flinkConfig.getString(
+                       ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE,
+                       
ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE));
+
+               
if(flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL))
 {
+                       frameworkInfo.setPrincipal(flinkConfig.getString(
+                               
ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL, null));
+
+                       credential = Protos.Credential.newBuilder();
+                       credential.setPrincipal(frameworkInfo.getPrincipal());
+
+                       
if(!flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET))
 {
+                               throw new 
IllegalConfigurationException(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET
 + " must be configured.");
+                       }
+                       credential.setSecret(flinkConfig.getString(
+                               
ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET, null));
+               }
+
+               MesosConfiguration mesos =
+                       new MesosConfiguration(masterUrl, frameworkInfo, 
Option.apply(credential));
+
+               return mesos;
+       }
+
+       private MesosWorkerStore createWorkerStore(Configuration flinkConfig) 
throws Exception {
+               MesosWorkerStore workerStore;
+               RecoveryMode recoveryMode = 
RecoveryMode.fromConfig(flinkConfig);
+               if (recoveryMode == RecoveryMode.STANDALONE) {
+                       workerStore = new StandaloneMesosWorkerStore();
+               }
+               else if (recoveryMode == RecoveryMode.ZOOKEEPER) {
+                       // note: the store is responsible for closing the 
client.
+                       CuratorFramework client = 
ZooKeeperUtils.startCuratorFramework(flinkConfig);
+                       workerStore = 
ZooKeeperMesosWorkerStore.createMesosWorkerStore(client, flinkConfig);
+               }
+               else {
+                       throw new IllegalConfigurationException("Unexpected 
recovery mode '" + recoveryMode + ".");
+               }
+
+               return workerStore;
+       }
+
+       /**
+        * Creates a Mesos task info template, which describes how to bring up 
a TaskManager process as
+        * a Mesos task.
+        *
+        * <p>This code is extremely Mesos-specific and registers all the 
artifacts that the TaskManager
+        * needs (such as JAR file, config file, ...) and all environment 
variables in a task info record.
+        * The Mesos fetcher then ensures that those artifacts will be copied 
into the task's sandbox directory.
+        * A lightweight HTTP server serves the artifacts to the fetcher.
+        *
+        * <p>We do this work before we start the ResourceManager actor in 
order to fail early if
+        * any of the operations here fail.
+        *
+        * @param flinkConfig
+        *         The Flink configuration object.
+        * @param mesosConfig
+        *         The Mesos configuration object.
+        * @param env
+        *         The environment variables.
+        * @param tmParams
+        *         The TaskManager container memory parameters.
+        * @param taskManagerConfig
+        *         The configuration for the TaskManagers.
+        * @param workingDirectory
+        *         The current application master container's working directory.
+        * @param taskManagerMainClass
+        *         The class with the main method.
+        * @param artifactServer
+        *         The artifact server.
+        * @param log
+        *         The logger.
+        *
+        * @return The task info template for the TaskManager processes.
+        *
+        * @throws Exception Thrown if the task info could not be created, for 
example if
+        *                   the resources could not be copied.
+        */
+       public static Protos.TaskInfo.Builder createTaskManagerContext(
+               Configuration flinkConfig,
+               MesosConfiguration mesosConfig,
+               Map<String, String> env,
+               MesosTaskManagerParameters tmParams,
+               Configuration taskManagerConfig,
+               String workingDirectory,
+               Class<?> taskManagerMainClass,
+               MesosArtifactServer artifactServer,
+               Logger log) throws Exception {
+
+
+               Protos.TaskInfo.Builder info = Protos.TaskInfo.newBuilder();
+               Protos.CommandInfo.Builder cmd = 
Protos.CommandInfo.newBuilder();
+
+               log.info("Setting up artifacts for TaskManagers");
+
+               String shipListString = 
env.get(MesosConfigKeys.ENV_CLIENT_SHIP_FILES);
+               require(shipListString != null, "Environment variable %s not 
set", MesosConfigKeys.ENV_CLIENT_SHIP_FILES);
+
+               String clientUsername = 
env.get(MesosConfigKeys.ENV_CLIENT_USERNAME);
+               require(clientUsername != null, "Environment variable %s not 
set", MesosConfigKeys.ENV_CLIENT_USERNAME);
+
+               String classPathString = 
env.get(MesosConfigKeys.ENV_FLINK_CLASSPATH);
+               require(classPathString != null, "Environment variable %s not 
set", MesosConfigKeys.ENV_FLINK_CLASSPATH);
+
+               // register the Flink jar
+               final File flinkJarFile = new File(workingDirectory, 
"flink.jar");
+               cmd.addUris(uri(artifactServer.addFile(flinkJarFile, 
"flink.jar"), true));
+
+               // register the TaskManager configuration
+               final File taskManagerConfigFile =
+                       new File(workingDirectory, UUID.randomUUID() + 
"-taskmanager-conf.yaml");
+               LOG.debug("Writing TaskManager configuration to {}", 
taskManagerConfigFile.getAbsolutePath());
+               BootstrapTools.writeConfiguration(taskManagerConfig, 
taskManagerConfigFile);
+               cmd.addUris(uri(artifactServer.addFile(taskManagerConfigFile, 
GlobalConfiguration.FLINK_CONF_FILENAME), true));
+
+               // prepare additional files to be shipped
+               for (String pathStr : shipListString.split(",")) {
+                       if (!pathStr.isEmpty()) {
+                               File shipFile = new File(workingDirectory, 
pathStr);
+                               
cmd.addUris(uri(artifactServer.addFile(shipFile, shipFile.getName()), true));
+                       }
+               }
+
+               log.info("Creating task info for TaskManagers");
+
+               // build the launch command
+               boolean hasLogback = new File(workingDirectory, 
"logback.xml").exists();
+               boolean hasLog4j = new File(workingDirectory, 
"log4j.properties").exists();
+
+               String launchCommand = 
BootstrapTools.getTaskManagerShellCommand(
+                       flinkConfig, tmParams.containeredParameters(), ".", ".",
+                       hasLogback, hasLog4j, taskManagerMainClass);
+               cmd.setValue(launchCommand);
+
+               // build the environment variables
+               Protos.Environment.Builder envBuilder = 
Protos.Environment.newBuilder();
+               for (Map.Entry<String, String> entry : 
tmParams.containeredParameters().taskManagerEnv().entrySet()) {
+                       envBuilder.addVariables(variable(entry.getKey(), 
entry.getValue()));
+               }
+               envBuilder.addVariables(variable(MesosConfigKeys.ENV_CLASSPATH, 
classPathString));
+               
envBuilder.addVariables(variable(MesosConfigKeys.ENV_CLIENT_USERNAME, 
clientUsername));
+
+               cmd.setEnvironment(envBuilder);
+
+               info.setCommand(cmd);
+
+               return info;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
new file mode 100644
index 0000000..3173286
--- /dev/null
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
@@ -0,0 +1,26 @@
+package org.apache.flink.mesos.runtime.clusterframework;
+
+/**
+ * The Mesos environment variables used for settings of the containers.
+ */
+public class MesosConfigKeys {
+       // 
------------------------------------------------------------------------
+       //  Environment variable names
+       // 
------------------------------------------------------------------------
+
+       public static final String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
+       public static final String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
+       public static final String ENV_SLOTS = "_SLOTS";
+       public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
+       public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
+       public static final String ENV_DYNAMIC_PROPERTIES = 
"_DYNAMIC_PROPERTIES";
+       public static final String ENV_FLINK_CONTAINER_ID = 
"_FLINK_CONTAINER_ID";
+       public static final String ENV_FLINK_TMP_DIR = "_FLINK_TMP_DIR";
+       public static final String ENV_FLINK_CLASSPATH = "_FLINK_CLASSPATH";
+       public static final String ENV_CLASSPATH = "CLASSPATH";
+       public static final String ENV_MESOS_SANDBOX = "MESOS_SANDBOX";
+       public static final String ENV_SESSION_ID = "_CLIENT_SESSION_ID";
+
+       /** Private constructor to prevent instantiation */
+       private MesosConfigKeys() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
new file mode 100644
index 0000000..483c7b7
--- /dev/null
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
@@ -0,0 +1,737 @@
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager<RegisteredMesosWorkerNode> {
+
+       /** The Mesos configuration (master and framework info) */
+       private final MesosConfiguration mesosConfig;
+
+       /** The TaskManager container parameters (like container memory size) */
+       private final MesosTaskManagerParameters taskManagerParameters;
+
+       /** Context information used to start a TaskManager Java process */
+       private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+       /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+       private final int maxFailedTasks;
+
+       /** Callback handler for the asynchronous Mesos scheduler */
+       private SchedulerProxy schedulerCallbackHandler;
+
+       /** Mesos scheduler driver */
+       private SchedulerDriver schedulerDriver;
+
+       private ActorRef connectionMonitor;
+
+       private ActorRef taskRouter;
+
+       private ActorRef launchCoordinator;
+
+       private ActorRef reconciliationCoordinator;
+
+       private MesosWorkerStore workerStore;
+
+       final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
+       final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
+       final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
+
+       /** The number of failed tasks since the master became active */
+       private int failedTasksSoFar;
+
+       public MesosFlinkResourceManager(
+               Configuration flinkConfig,
+               MesosConfiguration mesosConfig,
+               MesosWorkerStore workerStore,
+               LeaderRetrievalService leaderRetrievalService,
+               MesosTaskManagerParameters taskManagerParameters,
+               Protos.TaskInfo.Builder taskManagerLaunchContext,
+               int maxFailedTasks,
+               int numInitialTaskManagers) {
+
+               super(numInitialTaskManagers, flinkConfig, 
leaderRetrievalService);
+
+               this.mesosConfig = requireNonNull(mesosConfig);
+
+               this.workerStore = requireNonNull(workerStore);
+
+               this.taskManagerParameters = 
requireNonNull(taskManagerParameters);
+               this.taskManagerLaunchContext = 
requireNonNull(taskManagerLaunchContext);
+               this.maxFailedTasks = maxFailedTasks;
+
+               this.workersInNew = new HashMap<>();
+               this.workersInLaunch = new HashMap<>();
+               this.workersBeingReturned = new HashMap<>();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Mesos-specific behavior
+       // 
------------------------------------------------------------------------
+
+       @Override
+       protected void initialize() throws Exception {
+               LOG.info("Initializing Mesos resource master");
+
+               workerStore.start();
+
+               // create the scheduler driver to communicate with Mesos
+               schedulerCallbackHandler = new SchedulerProxy(self());
+
+               // register with Mesos
+               FrameworkInfo.Builder frameworkInfo = 
mesosConfig.frameworkInfo()
+                       .clone()
+                       .setCheckpoint(true);
+
+               Option<Protos.FrameworkID> frameworkID = 
workerStore.getFrameworkID();
+               if(frameworkID.isEmpty()) {
+                       LOG.info("Registering as new framework.");
+               }
+               else {
+                       LOG.info("Recovery scenario: re-registering using 
framework ID {}.", frameworkID.get().getValue());
+                       frameworkInfo.setId(frameworkID.get());
+               }
+
+               MesosConfiguration initializedMesosConfig = 
mesosConfig.withFrameworkInfo(frameworkInfo);
+               MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig);
+               schedulerDriver = 
initializedMesosConfig.createDriver(schedulerCallbackHandler, false);
+
+               // create supporting actors
+               connectionMonitor = createConnectionMonitor();
+               launchCoordinator = createLaunchCoordinator();
+               reconciliationCoordinator = createReconciliationCoordinator();
+               taskRouter = createTaskRouter();
+
+               recoverWorkers();
+
+               connectionMonitor.tell(new ConnectionMonitor.Start(), self());
+               schedulerDriver.start();
+       }
+
+       protected ActorRef createConnectionMonitor() {
+               return context().actorOf(
+                       
ConnectionMonitor.createActorProps(ConnectionMonitor.class, config),
+                       "connectionMonitor");
+       }
+
+       protected ActorRef createTaskRouter() {
+               return context().actorOf(
+                       Tasks.createActorProps(Tasks.class, config, 
schedulerDriver, TaskMonitor.class),
+                       "tasks");
+       }
+
+       protected ActorRef createLaunchCoordinator() {
+               return context().actorOf(
+                       
LaunchCoordinator.createActorProps(LaunchCoordinator.class, self(), config, 
schedulerDriver, createOptimizer()),
+                       "launchCoordinator");
+       }
+
+       protected ActorRef createReconciliationCoordinator() {
+               return context().actorOf(
+                       
ReconciliationCoordinator.createActorProps(ReconciliationCoordinator.class, 
config, schedulerDriver),
+                       "reconciliationCoordinator");
+       }
+
+       @Override
+       public void postStop() {
+               LOG.info("Stopping Mesos resource master");
+               super.postStop();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Actor messages
+       // 
------------------------------------------------------------------------
+
+       @Override
+       protected void handleMessage(Object message) {
+
+               // check for Mesos-specific actor messages first
+
+               // --- messages about Mesos connection
+               if (message instanceof Registered) {
+                       registered((Registered) message);
+               } else if (message instanceof ReRegistered) {
+                       reregistered((ReRegistered) message);
+               } else if (message instanceof Disconnected) {
+                       disconnected((Disconnected) message);
+               } else if (message instanceof Error) {
+                       error(((Error) message).message());
+
+               // --- messages about offers
+               } else if (message instanceof ResourceOffers || message 
instanceof OfferRescinded) {
+                       launchCoordinator.tell(message, self());
+               } else if (message instanceof AcceptOffers) {
+                       acceptOffers((AcceptOffers) message);
+
+               // --- messages about tasks
+               } else if (message instanceof StatusUpdate) {
+                       taskStatusUpdated((StatusUpdate) message);
+               } else if (message instanceof 
ReconciliationCoordinator.Reconcile) {
+                       // a reconciliation request from a task
+                       reconciliationCoordinator.tell(message, self());
+               } else if (message instanceof TaskMonitor.TaskTerminated) {
+                       // a termination message from a task
+                       TaskMonitor.TaskTerminated msg = 
(TaskMonitor.TaskTerminated) message;
+                       taskTerminated(msg.taskID(), msg.status());
+
+               } else  {
+                       // message handled by the generic resource master code
+                       super.handleMessage(message);
+               }
+       }
+
+       /**
+        * Called to shut down the cluster (not a failover situation).
+        *
+        * @param finalStatus The application status to report.
+        * @param optionalDiagnostics An optional diagnostics message.
+     */
+       @Override
+       protected void shutdownApplication(ApplicationStatus finalStatus, 
String optionalDiagnostics) {
+
+               LOG.info("Shutting down and unregistering as a Mesos 
framework.");
+               try {
+                       // unregister the framework, which implicitly removes 
all tasks.
+                       schedulerDriver.stop(false);
+               }
+               catch(Exception ex) {
+                       LOG.warn("unable to unregister the framework", ex);
+               }
+
+               try {
+                       workerStore.cleanup();
+               }
+               catch(Exception ex) {
+                       LOG.warn("unable to cleanup the ZooKeeper state", ex);
+               }
+
+               context().stop(self());
+       }
+
+       @Override
+       protected void fatalError(String message, Throwable error) {
+               // we do not unregister, but cause a hard fail of this process, 
to have it
+               // restarted by the dispatcher
+               LOG.error("FATAL ERROR IN MESOS APPLICATION MASTER: " + 
message, error);
+               LOG.error("Shutting down process");
+
+               // kill this process, this will make an external supervisor 
(the dispatcher) restart the process
+               System.exit(EXIT_CODE_FATAL_ERROR);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Worker Management
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Recover framework/worker information persisted by a prior 
incarnation of the RM.
+        */
+       private void recoverWorkers() throws Exception {
+               // if this application master starts as part of an 
ApplicationMaster/JobManager recovery,
+               // then some worker tasks are most likely still alive and we 
can re-obtain them
+               final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = 
workerStore.recoverWorkers();
+
+               if (!tasksFromPreviousAttempts.isEmpty()) {
+                       LOG.info("Retrieved {} TaskManagers from previous 
attempt", tasksFromPreviousAttempts.size());
+
+                       List<Tuple2<TaskRequest,String>> toAssign = new 
ArrayList<>(tasksFromPreviousAttempts.size());
+                       List<LaunchableTask> toLaunch = new 
ArrayList<>(tasksFromPreviousAttempts.size());
+
+                       for (final MesosWorkerStore.Worker worker : 
tasksFromPreviousAttempts) {
+                               LaunchableMesosWorker launchable = 
createLaunchableMesosWorker(worker.taskID());
+
+                               switch(worker.state()) {
+                                       case New:
+                                               
workersInNew.put(extractResourceID(worker.taskID()), worker);
+                                               toLaunch.add(launchable);
+                                               break;
+                                       case Launched:
+                                               
workersInLaunch.put(extractResourceID(worker.taskID()), worker);
+                                               toAssign.add(new 
Tuple2<>(launchable.taskRequest(), worker.hostname().get()));
+                                               break;
+                                       case Released:
+                                               
workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
+                                               break;
+                               }
+                               taskRouter.tell(new 
TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self());
+                       }
+
+                       // tell the launch coordinator about prior assignments
+                       if(toAssign.size() >= 1) {
+                               launchCoordinator.tell(new 
LaunchCoordinator.Assign(toAssign), self());
+                       }
+                       // tell the launch coordinator to launch any new tasks
+                       if(toLaunch.size() >= 1) {
+                               launchCoordinator.tell(new 
LaunchCoordinator.Launch(toLaunch), self());
+                       }
+               }
+       }
+
+       /**
+        * Plan for some additional workers to be launched.
+        *
+        * @param numWorkers The number of workers to allocate.
+     */
+       @Override
+       protected void requestNewWorkers(int numWorkers) {
+
+               try {
+                       List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new 
ArrayList<>(numWorkers);
+                       List<LaunchableTask> toLaunch = new 
ArrayList<>(numWorkers);
+
+                       // generate new workers into persistent state and 
launch associated actors
+                       for (int i = 0; i < numWorkers; i++) {
+                               MesosWorkerStore.Worker worker = 
MesosWorkerStore.Worker.newTask(workerStore.newTaskID());
+                               workerStore.putWorker(worker);
+                               
workersInNew.put(extractResourceID(worker.taskID()), worker);
+
+                               LaunchableMesosWorker launchable = 
createLaunchableMesosWorker(worker.taskID());
+
+                               LOG.info("Scheduling Mesos task {} with ({} MB, 
{} cpus).",
+                                       launchable.taskID().getValue(), 
launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs());
+
+                               toMonitor.add(new 
TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
+                               toLaunch.add(launchable);
+                       }
+
+                       // tell the task router about the new plans
+                       for (TaskMonitor.TaskGoalStateUpdated update : 
toMonitor) {
+                               taskRouter.tell(update, self());
+                       }
+
+                       // tell the launch coordinator to launch the new tasks
+                       if(toLaunch.size() >= 1) {
+                               launchCoordinator.tell(new 
LaunchCoordinator.Launch(toLaunch), self());
+                       }
+               }
+               catch(Exception ex) {
+                       fatalError("unable to request new workers", ex);
+               }
+       }
+
+       /**
+        * Accept offers as advised by the launch coordinator.
+        *
+        * Acceptance is routed through the RM to update the persistent state 
before
+        * forwarding the message to Mesos.
+     */
+       private void acceptOffers(AcceptOffers msg) {
+
+               try {
+                       List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new 
ArrayList<>(msg.operations().size());
+
+                       // transition the persistent state of some tasks to 
Launched
+                       for (Protos.Offer.Operation op : msg.operations()) {
+                               if (op.getType() != 
Protos.Offer.Operation.Type.LAUNCH) {
+                                       continue;
+                               }
+                               for (Protos.TaskInfo info : 
op.getLaunch().getTaskInfosList()) {
+                                       MesosWorkerStore.Worker worker = 
workersInNew.remove(extractResourceID(info.getTaskId()));
+                                       assert (worker != null);
+
+                                       worker = 
worker.launchTask(info.getSlaveId(), msg.hostname());
+                                       workerStore.putWorker(worker);
+                                       
workersInLaunch.put(extractResourceID(worker.taskID()), worker);
+
+                                       LOG.info("Launching Mesos task {} on 
host {}.",
+                                               worker.taskID().getValue(), 
worker.hostname().get());
+
+                                       toMonitor.add(new 
TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
+                               }
+                       }
+
+                       // tell the task router about the new plans
+                       for (TaskMonitor.TaskGoalStateUpdated update : 
toMonitor) {
+                               taskRouter.tell(update, self());
+                       }
+
+                       // send the acceptance message to Mesos
+                       schedulerDriver.acceptOffers(msg.offerIds(), 
msg.operations(), msg.filters());
+               }
+               catch(Exception ex) {
+                       fatalError("unable to accept offers", ex);
+               }
+       }
+
+       /**
+        * Handle a task status change.
+     */
+       private void taskStatusUpdated(StatusUpdate message) {
+               taskRouter.tell(message, self());
+               reconciliationCoordinator.tell(message, self());
+               schedulerDriver.acknowledgeStatusUpdate(message.status());
+       }
+
+       /**
+        * Accept the given started worker into the internal state.
+        *
+        * @param resourceID The worker resource id
+        * @return A registered worker node record.
+        */
+       @Override
+       protected RegisteredMesosWorkerNode workerStarted(ResourceID 
resourceID) {
+               MesosWorkerStore.Worker inLaunch = 
workersInLaunch.remove(resourceID);
+               if (inLaunch == null) {
+                       // Worker was not in state "being launched", this can 
indicate that the TaskManager
+                       // in this worker was already registered or that the 
container was not started
+                       // by this resource manager. Simply ignore this 
resourceID.
+                       return null;
+               }
+               return new RegisteredMesosWorkerNode(inLaunch);
+       }
+
+       /**
+        * Accept the given registered workers into the internal state.
+        *
+        * @param toConsolidate The worker IDs known previously to the 
JobManager.
+        * @return A collection of registered worker node records.
+     */
+       @Override
+       protected Collection<RegisteredMesosWorkerNode> 
reacceptRegisteredWorkers(Collection<ResourceID> toConsolidate) {
+
+               // we check for each task manager if we recognize its Mesos 
task ID
+               List<RegisteredMesosWorkerNode> accepted = new 
ArrayList<>(toConsolidate.size());
+               for (ResourceID resourceID : toConsolidate) {
+                       MesosWorkerStore.Worker worker = 
workersInLaunch.remove(resourceID);
+                       if (worker != null) {
+                               LOG.info("Mesos worker consolidation recognizes 
TaskManager {}.", resourceID);
+                               accepted.add(new 
RegisteredMesosWorkerNode(worker));
+                       }
+                       else {
+                               if(isStarted(resourceID)) {
+                                       LOG.info("TaskManager {} has already 
been registered at the resource manager.", resourceID);
+                               }
+                               else {
+                                       LOG.info("Mesos worker consolidation 
does not recognize TaskManager {}.", resourceID);
+                               }
+                       }
+               }
+               return accepted;
+       }
+
+       /**
+        * Release the given pending worker.
+        */
+       @Override
+       protected void releasePendingWorker(ResourceID id) {
+               MesosWorkerStore.Worker worker = workersInLaunch.remove(id);
+               if (worker != null) {
+                       releaseWorker(worker);
+               } else {
+                       LOG.error("Cannot find worker {} to release. Ignoring 
request.", id);
+               }
+       }
+
+       /**
+        * Release the given started worker.
+        */
+       @Override
+       protected void releaseStartedWorker(RegisteredMesosWorkerNode worker) {
+               releaseWorker(worker.task());
+       }
+
+       /**
+        * Plan for the removal of the given worker.
+     */
+       private void releaseWorker(MesosWorkerStore.Worker worker) {
+               try {
+                       LOG.info("Releasing worker {}", worker.taskID());
+
+                       // update persistent state of worker to Released
+                       worker = worker.releaseTask();
+                       workerStore.putWorker(worker);
+                       
workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
+                       taskRouter.tell(new 
TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self());
+
+                       if (worker.hostname().isDefined()) {
+                               // tell the launch coordinator that the task is 
being unassigned from the host, for planning purposes
+                               launchCoordinator.tell(new 
LaunchCoordinator.Unassign(worker.taskID(), worker.hostname().get()), self());
+                       }
+               }
+               catch (Exception ex) {
+                       fatalError("unable to release worker", ex);
+               }
+       }
+
+       @Override
+       protected int getNumWorkerRequestsPending() {
+               return workersInNew.size();
+       }
+
+       @Override
+       protected int getNumWorkersPendingRegistration() {
+               return workersInLaunch.size();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Callbacks from the Mesos Master
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Called when connected to Mesos as a new framework.
+        */
+       private void registered(Registered message) {
+               connectionMonitor.tell(message, self());
+
+               try {
+                       
workerStore.setFrameworkID(Option.apply(message.frameworkId()));
+               }
+               catch(Exception ex) {
+                       fatalError("unable to store the assigned framework ID", 
ex);
+                       return;
+               }
+
+               launchCoordinator.tell(message, self());
+               reconciliationCoordinator.tell(message, self());
+               taskRouter.tell(message, self());
+       }
+
+       /**
+        * Called when reconnected to Mesos following a failover event.
+        */
+       private void reregistered(ReRegistered message) {
+               connectionMonitor.tell(message, self());
+               launchCoordinator.tell(message, self());
+               reconciliationCoordinator.tell(message, self());
+               taskRouter.tell(message, self());
+       }
+
+       /**
+        * Called when disconnected from Mesos.
+        */
+       private void disconnected(Disconnected message) {
+               connectionMonitor.tell(message, self());
+               launchCoordinator.tell(message, self());
+               reconciliationCoordinator.tell(message, self());
+               taskRouter.tell(message, self());
+       }
+
+       /**
+        * Called when an error is reported by the scheduler callback.
+        */
+       private void error(String message) {
+               self().tell(new FatalErrorOccurred("Connection to Mesos 
failed", new Exception(message)), self());
+       }
+
+       /**
+        * Invoked when a Mesos task reaches a terminal status.
+     */
+       private void taskTerminated(Protos.TaskID taskID, Protos.TaskStatus 
status) {
+               // this callback occurs for failed containers and for released 
containers alike
+
+               final ResourceID id = extractResourceID(taskID);
+
+               try {
+                       workerStore.removeWorker(taskID);
+               }
+               catch(Exception ex) {
+                       fatalError("unable to remove worker", ex);
+                       return;
+               }
+
+               // check if this is a failed task or a released task
+               if (workersBeingReturned.remove(id) != null) {
+                       // regular finished worker that we released
+                       LOG.info("Worker {} finished successfully with 
diagnostics: {}",
+                               id, status.getMessage());
+               } else {
+                       // failed worker, either at startup, or running
+                       final MesosWorkerStore.Worker launched = 
workersInLaunch.remove(id);
+                       if (launched != null) {
+                               LOG.info("Mesos task {} failed, with a 
TaskManager in launch or registration. " +
+                                       "State: {} Reason: {} ({})", id, 
status.getState(), status.getReason(), status.getMessage());
+                               // we will trigger re-acquiring new workers at 
the end
+                       } else {
+                               // failed registered worker
+                               LOG.info("Mesos task {} failed, with a 
registered TaskManager. " +
+                                       "State: {} Reason: {} ({})", id, 
status.getState(), status.getReason(), status.getMessage());
+
+                               // notify the generic logic, which notifies the 
JobManager, etc.
+                               notifyWorkerFailed(id, "Mesos task " + id + " 
failed.  State: " + status.getState());
+                       }
+
+                       // general failure logging
+                       failedTasksSoFar++;
+
+                       String diagMessage = String.format("Diagnostics for 
task %s in state %s : " +
+                                       "reason=%s message=%s",
+                               id, status.getState(), status.getReason(), 
status.getMessage());
+                       sendInfoMessage(diagMessage);
+
+                       LOG.info(diagMessage);
+                       LOG.info("Total number of failed tasks so far: " + 
failedTasksSoFar);
+
+                       // maxFailedTasks == -1 is infinite number of retries.
+                       if (maxFailedTasks >= 0 && failedTasksSoFar > 
maxFailedTasks) {
+                               String msg = "Stopping Mesos session because 
the number of failed tasks ("
+                                       + failedTasksSoFar + ") exceeded the 
maximum failed tasks ("
+                                       + maxFailedTasks + "). This number is 
controlled by the '"
+                                       + 
ConfigConstants.MESOS_MAX_FAILED_TASKS + "' configuration setting. "
+                                       + "By default its the number of 
requested tasks.";
+
+                               LOG.error(msg);
+                               self().tell(decorateMessage(new 
StopCluster(ApplicationStatus.FAILED, msg)),
+                                       ActorRef.noSender());
+
+                               // no need to do anything else
+                               return;
+                       }
+               }
+
+               // in case failed containers were among the finished 
containers, make
+               // sure we re-examine and request new ones
+               triggerCheckWorkers();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID 
taskID) {
+               LaunchableMesosWorker launchable =
+                       new LaunchableMesosWorker(taskManagerParameters, 
taskManagerLaunchContext, taskID);
+               return launchable;
+       }
+
+       /**
+        * Extracts a unique ResourceID from the Mesos task.
+        *
+        * @param taskId the Mesos TaskID
+        * @return The ResourceID for the container
+        */
+       static ResourceID extractResourceID(Protos.TaskID taskId) {
+               return new ResourceID(taskId.getValue());
+       }
+
+       /**
+        * Extracts the Mesos task goal state from the worker information.
+        * @param worker the persistent worker information.
+        * @return goal state information for the {@Link TaskMonitor}.
+     */
+       static TaskMonitor.TaskGoalState 
extractGoalState(MesosWorkerStore.Worker worker) {
+               switch(worker.state()) {
+                       case New: return new TaskMonitor.New(worker.taskID());
+                       case Launched: return new 
TaskMonitor.Launched(worker.taskID(), worker.slaveID().get());
+                       case Released: return new 
TaskMonitor.Released(worker.taskID(), worker.slaveID().get());
+                       default: throw new IllegalArgumentException();
+               }
+       }
+
+       /**
+        * Creates the Fenzo optimizer (builder).
+        * The builder is an indirection to faciliate unit testing of the 
Launch Coordinator.
+     */
+       private static TaskSchedulerBuilder createOptimizer() {
+               return new TaskSchedulerBuilder() {
+                       TaskScheduler.Builder builder = new 
TaskScheduler.Builder();
+
+                       @Override
+                       public TaskSchedulerBuilder 
withLeaseRejectAction(Action1<VirtualMachineLease> action) {
+                               builder.withLeaseRejectAction(action);
+                               return this;
+                       }
+
+                       @Override
+                       public TaskScheduler build() {
+                               return builder.build();
+                       }
+               };
+       }
+
+       /**
+        * Creates the props needed to instantiate this actor.
+        *
+        * Rather than extracting and validating parameters in the constructor, 
this factory method takes
+        * care of that. That way, errors occur synchronously, and are not 
swallowed simply in a
+        * failed asynchronous attempt to start the actor.
+
+        * @param actorClass
+        *             The actor class, to allow overriding this actor with 
subclasses for testing.
+        * @param flinkConfig
+        *             The Flink configuration object.
+        * @param taskManagerParameters
+        *             The parameters for launching TaskManager containers.
+        * @param taskManagerLaunchContext
+        *             The parameters for launching the TaskManager processes 
in the TaskManager containers.
+        * @param numInitialTaskManagers
+        *             The initial number of TaskManagers to allocate.
+        * @param log
+        *             The logger to log to.
+        *
+        * @return The Props object to instantiate the 
MesosFlinkResourceManager actor.
+        */
+       public static Props createActorProps(Class<? extends 
MesosFlinkResourceManager> actorClass,
+                       Configuration flinkConfig,
+                       MesosConfiguration mesosConfig,
+                       MesosWorkerStore workerStore,
+                       LeaderRetrievalService leaderRetrievalService,
+                       MesosTaskManagerParameters taskManagerParameters,
+                       Protos.TaskInfo.Builder taskManagerLaunchContext,
+                       int numInitialTaskManagers,
+                       Logger log)
+       {
+               final int maxFailedTasks = flinkConfig.getInteger(
+                       ConfigConstants.MESOS_MAX_FAILED_TASKS, 
numInitialTaskManagers);
+               if (maxFailedTasks >= 0) {
+                       log.info("Mesos framework tolerates {} failed tasks 
before giving up",
+                               maxFailedTasks);
+               }
+
+               return Props.create(actorClass,
+                       flinkConfig,
+                       mesosConfig,
+                       workerStore,
+                       leaderRetrievalService,
+                       taskManagerParameters,
+                       taskManagerLaunchContext,
+                       maxFailedTasks,
+                       numInitialTaskManagers);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
new file mode 100644
index 0000000..b3956aa
--- /dev/null
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -0,0 +1,51 @@
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+
+import static java.util.Objects.requireNonNull;
+
+public class MesosTaskManagerParameters {
+
+       private double cpus;
+
+       private ContaineredTaskManagerParameters containeredParameters;
+
+       public MesosTaskManagerParameters(double cpus, 
ContaineredTaskManagerParameters containeredParameters) {
+               requireNonNull(containeredParameters);
+               this.cpus = cpus;
+               this.containeredParameters = containeredParameters;
+       }
+
+       public double cpus() {
+               return cpus;
+       }
+
+       public ContaineredTaskManagerParameters containeredParameters() {
+               return containeredParameters;
+       }
+
+       @Override
+       public String toString() {
+               return "MesosTaskManagerParameters{" +
+                       "cpus=" + cpus +
+                       ", containeredParameters=" + containeredParameters +
+                       '}';
+       }
+
+       /**
+        * Create the Mesos TaskManager parameters.
+        * @param flinkConfig the TM configuration.
+        * @param containeredParameters additional containered parameters.
+     */
+       public static MesosTaskManagerParameters create(
+               Configuration flinkConfig,
+               ContaineredTaskManagerParameters containeredParameters) {
+
+               double cpus = 
flinkConfig.getDouble(ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CPUS,
+                       Math.max(containeredParameters.numSlots(), 1.0));
+
+               return new MesosTaskManagerParameters(cpus, 
containeredParameters);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
new file mode 100644
index 0000000..5dfc75e
--- /dev/null
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -0,0 +1,99 @@
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.Map;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The entry point for running a TaskManager in a Mesos container.
+ */
+public class MesosTaskManagerRunner {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(MesosTaskManagerRunner.class);
+
+       /** The process environment variables */
+       private static final Map<String, String> ENV = System.getenv();
+
+       public static void runTaskManager(String[] args, final Class<? extends 
TaskManager> taskManager) throws IOException {
+               EnvironmentInformation.logEnvironmentInfo(LOG, 
taskManager.getSimpleName(), args);
+               org.apache.flink.runtime.util.SignalHandler.register(LOG);
+
+               // try to parse the command line arguments
+               final Configuration configuration;
+               try {
+                       configuration = 
TaskManager.parseArgsAndLoadConfig(args);
+
+                       // add dynamic properties to TaskManager configuration.
+                       final Configuration dynamicProperties =
+                               
FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
+                       LOG.debug("Mesos dynamic properties: {}", 
dynamicProperties);
+                       configuration.addAll(dynamicProperties);
+               }
+               catch (Throwable t) {
+                       LOG.error(t.getMessage(), t);
+                       System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+                       return;
+               }
+
+               // read the environment variables
+               final Map<String, String> envs = System.getenv();
+               final String effectiveUsername = 
envs.get(MesosConfigKeys.ENV_CLIENT_USERNAME);
+               final String tmpDirs = 
envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);
+
+               // configure local directory
+               String flinkTempDirs = 
configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null);
+               if (flinkTempDirs != null) {
+                       LOG.info("Overriding Mesos temporary file directories 
with those " +
+                               "specified in the Flink config: " + 
flinkTempDirs);
+               }
+               else if (tmpDirs != null) {
+                       LOG.info("Setting directories for temporary files to: " 
+ tmpDirs);
+                       
configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tmpDirs);
+               }
+
+               LOG.info("Mesos task runs as '" + 
UserGroupInformation.getCurrentUser().getShortUserName() +
+                       "', setting user to execute Flink TaskManager to '" + 
effectiveUsername + "'");
+
+               // tell akka to die in case of an error
+               
configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
+
+               UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(effectiveUsername);
+               for (Token<? extends TokenIdentifier> toks : 
UserGroupInformation.getCurrentUser().getTokens()) {
+                       ugi.addToken(toks);
+               }
+
+               // Infer the resource identifier from the environment variable
+               String containerID = 
Preconditions.checkNotNull(envs.get(MesosConfigKeys.ENV_FLINK_CONTAINER_ID));
+               final ResourceID resourceId = new ResourceID(containerID);
+               LOG.info("ResourceID assigned for this container: {}", 
resourceId);
+
+               ugi.doAs(new PrivilegedAction<Object>() {
+                       @Override
+                       public Object run() {
+                               try {
+                                       
TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, 
taskManager);
+                               }
+                               catch (Throwable t) {
+                                       LOG.error("Error while starting the 
TaskManager", t);
+                                       
System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+                               }
+                               return null;
+                       }
+               });
+       }
+}

Reply via email to