[FLINK-1984] Mesos ResourceManager - T1 milestone Implemented Mesos AppMaster including: - runners for AppMaster and TaskManager - MesosFlinkResourceManager as a Mesos framework - ZK persistent storage for Mesos tasks - reusable scheduler actors for: - offer handling using Netflix Fenzo (LaunchCoordinator) - reconciliation (ReconciliationCoordinator) - task monitoring (TaskMonitor) - connection monitoring (ConnectionMonitor) - lightweight HTTP server to serve artifacts to the Mesos fetcher (ArtifactServer) - scenario-based logging for: - connectivity issues - offer handling (receive, process, decline, rescind, accept) - incorporated FLINK-4152, FLINK-3904, FLINK-4141, FLINK-3675, FLINK-4166
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9b2be05 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9b2be05 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9b2be05 Branch: refs/heads/master Commit: d9b2be054f7dadf902d74622352e3ec8dfdcd584 Parents: 578e80e Author: wrighe3 <eron.wri...@emc.com> Authored: Thu Jul 14 00:12:49 2016 -0700 Committer: Maximilian Michels <m...@apache.org> Committed: Mon Aug 29 17:27:10 2016 +0200 ---------------------------------------------------------------------- .../flink/configuration/ConfigConstants.java | 76 ++ flink-dist/pom.xml | 7 +- flink-mesos/pom.xml | 294 ++++++++ .../main/java/org/apache/flink/mesos/Utils.java | 49 ++ .../flink/mesos/cli/FlinkMesosSessionCli.java | 41 ++ .../clusterframework/LaunchableMesosWorker.java | 187 +++++ .../MesosApplicationMasterRunner.java | 600 +++++++++++++++ .../clusterframework/MesosConfigKeys.java | 26 + .../MesosFlinkResourceManager.java | 737 +++++++++++++++++++ .../MesosTaskManagerParameters.java | 51 ++ .../MesosTaskManagerRunner.java | 99 +++ .../RegisteredMesosWorkerNode.scala | 15 + .../store/MesosWorkerStore.java | 134 ++++ .../store/StandaloneMesosWorkerStore.java | 69 ++ .../store/ZooKeeperMesosWorkerStore.java | 272 +++++++ .../flink/mesos/scheduler/LaunchableTask.java | 24 + .../flink/mesos/scheduler/SchedulerProxy.java | 87 +++ .../mesos/scheduler/TaskSchedulerBuilder.java | 16 + .../mesos/scheduler/messages/AcceptOffers.java | 56 ++ .../mesos/scheduler/messages/Connected.java | 8 + .../mesos/scheduler/messages/Disconnected.java | 12 + .../flink/mesos/scheduler/messages/Error.java | 24 + .../scheduler/messages/OfferRescinded.java | 26 + .../mesos/scheduler/messages/ReRegistered.java | 30 + .../mesos/scheduler/messages/Registered.java | 39 + .../scheduler/messages/ResourceOffers.java | 30 + .../mesos/scheduler/messages/SlaveLost.java | 26 + .../mesos/scheduler/messages/StatusUpdate.java | 27 + .../flink/mesos/util/MesosArtifactServer.java | 286 +++++++ .../flink/mesos/util/MesosConfiguration.java | 108 +++ .../apache/flink/mesos/util/ZooKeeperUtils.java | 22 + flink-mesos/src/main/resources/log4j.properties | 27 + .../clusterframework/MesosJobManager.scala | 66 ++ .../clusterframework/MesosTaskManager.scala | 47 ++ .../mesos/scheduler/ConnectionMonitor.scala | 108 +++ .../mesos/scheduler/LaunchCoordinator.scala | 331 +++++++++ .../scheduler/ReconciliationCoordinator.scala | 164 +++++ .../flink/mesos/scheduler/TaskMonitor.scala | 240 ++++++ .../apache/flink/mesos/scheduler/Tasks.scala | 96 +++ .../ContaineredJobManager.scala | 174 +++++ .../MesosFlinkResourceManagerTest.java | 697 ++++++++++++++++++ .../src/test/resources/log4j-test.properties | 32 + flink-mesos/src/test/resources/logback-test.xml | 37 + .../scala/org/apache/flink/mesos/Utils.scala | 34 + .../mesos/scheduler/LaunchCoordinatorTest.scala | 421 +++++++++++ .../ReconciliationCoordinatorTest.scala | 214 ++++++ .../flink/mesos/scheduler/TaskMonitorTest.scala | 237 ++++++ .../org/apache/flink/runtime/akka/FSMSpec.scala | 40 + .../flink/runtime/util/ZooKeeperUtils.java | 2 +- .../flink/runtime/jobmanager/JobManager.scala | 3 +- pom.xml | 1 + 51 files changed, 6446 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 514c730..2fe27e0 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -426,6 +426,60 @@ public final class ConfigConstants { public static final String YARN_APPLICATION_MASTER_PORT = "yarn.application-master.port"; + // ------------------------ Mesos Configuration ------------------------ + + /** + * The maximum number of failed Mesos tasks before entirely stopping + * the Mesos session / job on Mesos. + * + * By default, we take the number of of initially requested tasks. + */ + public static final String MESOS_MAX_FAILED_TASKS = "mesos.maximum-failed-tasks"; + + /** + * The Mesos master URL. + * + * The value should be in one of the following forms: + * <pre> + * {@code + * host:port + * zk://host1:port1,host2:port2,.../path + * zk://username:password@host1:port1,host2:port2,.../path + * file:///path/to/file (where file contains one of the above) + * } + * </pre> + * + */ + public static final String MESOS_MASTER_URL = "mesos.master"; + + /** + * The failover timeout for the Mesos scheduler, after which running tasks are automatically shut down. + * + * The default value is 600 (seconds). + */ + public static final String MESOS_FAILOVER_TIMEOUT_SECONDS = "mesos.failover-timeout"; + + /** + * The config parameter defining the Mesos artifact server port to use. + * Setting the port to 0 will let the OS choose an available port. + */ + public static final String MESOS_ARTIFACT_SERVER_PORT_KEY = "mesos.resourcemanager.artifactserver.port"; + + public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_NAME = "mesos.resourcemanager.framework.name"; + + public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE = "mesos.resourcemanager.framework.role"; + + public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL = "mesos.resourcemanager.framework.principal"; + + public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET = "mesos.resourcemanager.framework.secret"; + + /** + * The cpus to acquire from Mesos. + * + * By default, we use the number of requested task slots. + */ + public static final String MESOS_RESOURCEMANAGER_TASKS_CPUS = "mesos.resourcemanager.tasks.cpus"; + // ------------------------ Hadoop Configuration ------------------------ /** @@ -736,6 +790,9 @@ public final class ConfigConstants { @Deprecated public static final String ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "recovery.zookeeper.path.checkpoint-counter"; + /** ZooKeeper root path (ZNode) for Mesos workers. */ + public static final String ZOOKEEPER_MESOS_WORKERS_PATH = "recovery.zookeeper.path.mesos-workers"; + /** Deprecated in favour of {@link #HA_ZOOKEEPER_SESSION_TIMEOUT}. */ @Deprecated public static final String ZOOKEEPER_SESSION_TIMEOUT = "recovery.zookeeper.client.session-timeout"; @@ -983,6 +1040,23 @@ public final class ConfigConstants { */ public static final String DEFAULT_YARN_JOB_MANAGER_PORT = "0"; + // ------ Mesos-Specific Configuration ------ + + /** The default failover timeout provided to Mesos (10 mins) */ + public static final int DEFAULT_MESOS_FAILOVER_TIMEOUT_SECS = 10 * 60; + + /** + * The default network port to listen on for the Mesos artifact server. + */ + public static final int DEFAULT_MESOS_ARTIFACT_SERVER_PORT = 0; + + /** + * The default Mesos framework name for the ResourceManager to use. + */ + public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_NAME = "Flink"; + + public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE = "*"; + // ------------------------ File System Behavior ------------------------ /** @@ -1131,6 +1205,8 @@ public final class ConfigConstants { public static final String DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "/checkpoint-counter"; + public static final String DEFAULT_ZOOKEEPER_MESOS_WORKERS_PATH = "/mesos-workers"; + public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = 60000; public static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT = 15000; http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-dist/pom.xml ---------------------------------------------------------------------- diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml index 539ca8e..ec84adc 100644 --- a/flink-dist/pom.xml +++ b/flink-dist/pom.xml @@ -119,8 +119,13 @@ under the License. <artifactId>flink-metrics-jmx</artifactId> <version>${project.version}</version> </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-mesos_2.10</artifactId> + <version>${project.version}</version> + </dependency> - </dependencies> <!-- See main pom.xml for explanation of profiles --> http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/pom.xml ---------------------------------------------------------------------- diff --git a/flink-mesos/pom.xml b/flink-mesos/pom.xml new file mode 100644 index 0000000..c344ab2 --- /dev/null +++ b/flink-mesos/pom.xml @@ -0,0 +1,294 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-parent</artifactId> + <version>1.1-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-mesos_2.10</artifactId> + <name>flink-mesos</name> + <packaging>jar</packaging> + + <properties> + <mesos.version>0.27.1</mesos.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_2.10</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <artifactId>hadoop-core</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_2.10</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>${shading-artifact.name}</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.10</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-actor_${scala.binary.version}</artifactId> + </dependency> + + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-remote_${scala.binary.version}</artifactId> + </dependency> + + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-slf4j_${scala.binary.version}</artifactId> + </dependency> + + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-testkit_${scala.binary.version}</artifactId> + </dependency> + + <!--<dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-camel_${scala.binary.version}</artifactId> + </dependency>--> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + <version>${curator.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-curator-recipes</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- =================================================== + Dependencies for Mesos + =================================================== --> + + <dependency> + <groupId>org.apache.mesos</groupId> + <artifactId>mesos</artifactId> + <version>${mesos.version}</version> + </dependency> + + <dependency> + <groupId>com.netflix.fenzo</groupId> + <artifactId>fenzo-core</artifactId> + <version>0.9.3</version> + </dependency> + + <dependency> + <groupId>tv.cntt</groupId> + <artifactId>netty-router</artifactId> + <version>1.10</version> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + </dependencies> + + <build> + <plugins> + <!-- Scala Compiler --> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.1.4</version> + <executions> + <!-- Run scala compiler in the process-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) compile phase --> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + + <!-- Run scala compiler in the process-test-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) test-compile phase --> + <execution> + <id>scala-test-compile</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <jvmArgs> + <jvmArg>-Xms128m</jvmArg> + <jvmArg>-Xmx512m</jvmArg> + </jvmArgs> + </configuration> + </plugin> + + <!-- Eclipse Integration --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-eclipse-plugin</artifactId> + <version>2.8</version> + <configuration> + <downloadSources>true</downloadSources> + <projectnatures> + <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> + <projectnature>org.eclipse.jdt.core.javanature</projectnature> + </projectnatures> + <buildcommands> + <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> + </buildcommands> + <classpathContainers> + <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> + <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> + </classpathContainers> + <excludes> + <exclude>org.scala-lang:scala-library</exclude> + <exclude>org.scala-lang:scala-compiler</exclude> + </excludes> + <sourceIncludes> + <sourceInclude>**/*.scala</sourceInclude> + <sourceInclude>**/*.java</sourceInclude> + </sourceIncludes> + </configuration> + </plugin> + + <!-- Adding scala source directories to build path --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.7</version> + <executions> + <!-- Add src/main/scala to eclipse build path --> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/scala</source> + </sources> + </configuration> + </execution> + <!-- Add src/test/scala to eclipse build path --> + <execution> + <id>add-test-source</id> + <phase>generate-test-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>src/test/scala</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + + <!-- Scala Code Style, most of the configuration done via plugin management --> + <plugin> + <groupId>org.scalastyle</groupId> + <artifactId>scalastyle-maven-plugin</artifactId> + <configuration> + <configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation> + </configuration> + </plugin> + + <!-- Relocate curator --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>shade-flink</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <artifactSet> + <includes combine.children="append"> + <include>org.apache.flink:flink-shaded-curator-recipes</include> + </includes> + </artifactSet> + <relocations combine.children="append"> + <relocation> + <pattern>org.apache.curator</pattern> + <shadedPattern>org.apache.flink.shaded.org.apache.curator</shadedPattern> + </relocation> + </relocations> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java new file mode 100644 index 0000000..2509465 --- /dev/null +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java @@ -0,0 +1,49 @@ +package org.apache.flink.mesos; + +import org.apache.mesos.Protos; + +import java.net.URL; +import java.util.Arrays; + +public class Utils { + /** + * Construct a Mesos environment variable. + */ + public static Protos.Environment.Variable variable(String name, String value) { + return Protos.Environment.Variable.newBuilder() + .setName(name) + .setValue(value) + .build(); + } + + /** + * Construct a Mesos URI. + */ + public static Protos.CommandInfo.URI uri(URL url, boolean cacheable) { + return Protos.CommandInfo.URI.newBuilder() + .setValue(url.toExternalForm()) + .setExtract(false) + .setCache(cacheable) + .build(); + } + + public static Protos.Resource scalar(String name, double value) { + return Protos.Resource.newBuilder() + .setName(name) + .setType(Protos.Value.Type.SCALAR) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(value)) + .build(); + } + + public static Protos.Value.Range range(long begin, long end) { + return Protos.Value.Range.newBuilder().setBegin(begin).setEnd(end).build(); + } + + public static Protos.Resource ranges(String name, Protos.Value.Range... ranges) { + return Protos.Resource.newBuilder() + .setName(name) + .setType(Protos.Value.Type.RANGES) + .setRanges(Protos.Value.Ranges.newBuilder().addAllRange(Arrays.asList(ranges)).build()) + .build(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java b/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java new file mode 100644 index 0000000..b767344 --- /dev/null +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java @@ -0,0 +1,41 @@ +package org.apache.flink.mesos.cli; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.Configuration; + +import java.io.IOException; +import java.util.Map; + +public class FlinkMesosSessionCli { + + private static final ObjectMapper mapper = new ObjectMapper(); + + public static Configuration decodeDynamicProperties(String dynamicPropertiesEncoded) { + try { + Configuration configuration = new Configuration(); + if(dynamicPropertiesEncoded != null) { + TypeReference<Map<String, String>> typeRef = new TypeReference<Map<String, String>>() {}; + Map<String,String> props = mapper.readValue(dynamicPropertiesEncoded, typeRef); + for (Map.Entry<String, String> property : props.entrySet()) { + configuration.setString(property.getKey(), property.getValue()); + } + } + return configuration; + } + catch(IOException ex) { + throw new IllegalArgumentException("unreadable encoded properties", ex); + } + } + + public static String encodeDynamicProperties(Configuration configuration) { + try { + String dynamicPropertiesEncoded = mapper.writeValueAsString(configuration.toMap()); + return dynamicPropertiesEncoded; + } + catch (JsonProcessingException ex) { + throw new IllegalArgumentException("unwritable properties", ex); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java new file mode 100644 index 0000000..8abd79a --- /dev/null +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java @@ -0,0 +1,187 @@ +package org.apache.flink.mesos.runtime.clusterframework; + +import com.netflix.fenzo.ConstraintEvaluator; +import com.netflix.fenzo.TaskAssignmentResult; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.VMTaskFitnessCalculator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.mesos.Protos; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.mesos.Utils.variable; +import static org.apache.flink.mesos.Utils.range; +import static org.apache.flink.mesos.Utils.ranges; +import static org.apache.flink.mesos.Utils.scalar; + +/** + * Specifies how to launch a Mesos worker. + */ +public class LaunchableMesosWorker implements LaunchableTask { + + /** + * The set of configuration keys to be dynamically configured with a port allocated from Mesos. + */ + private static String[] TM_PORT_KEYS = { + "taskmanager.rpc.port", + "taskmanager.data.port" }; + + private final MesosTaskManagerParameters params; + private final Protos.TaskInfo.Builder template; + private final Protos.TaskID taskID; + private final Request taskRequest; + + /** + * Construct a launchable Mesos worker. + * @param params the TM parameters such as memory, cpu to acquire. + * @param template a template for the TaskInfo to be constructed at launch time. + * @param taskID the taskID for this worker. + */ + public LaunchableMesosWorker(MesosTaskManagerParameters params, Protos.TaskInfo.Builder template, Protos.TaskID taskID) { + this.params = params; + this.template = template; + this.taskID = taskID; + this.taskRequest = new Request(); + } + + public Protos.TaskID taskID() { + return taskID; + } + + @Override + public TaskRequest taskRequest() { + return taskRequest; + } + + class Request implements TaskRequest { + private final AtomicReference<TaskRequest.AssignedResources> assignedResources = new AtomicReference<>(); + + @Override + public String getId() { + return taskID.getValue(); + } + + @Override + public String taskGroupName() { + return ""; + } + + @Override + public double getCPUs() { + return params.cpus(); + } + + @Override + public double getMemory() { + return params.containeredParameters().taskManagerTotalMemoryMB(); + } + + @Override + public double getNetworkMbps() { + return 0.0; + } + + @Override + public double getDisk() { + return 0.0; + } + + @Override + public int getPorts() { + return TM_PORT_KEYS.length; + } + + @Override + public Map<String, NamedResourceSetRequest> getCustomNamedResources() { + return Collections.emptyMap(); + } + + @Override + public List<? extends ConstraintEvaluator> getHardConstraints() { + return null; + } + + @Override + public List<? extends VMTaskFitnessCalculator> getSoftConstraints() { + return null; + } + + @Override + public void setAssignedResources(AssignedResources assignedResources) { + this.assignedResources.set(assignedResources); + } + + @Override + public AssignedResources getAssignedResources() { + return assignedResources.get(); + } + + @Override + public String toString() { + return "Request{" + + "cpus=" + getCPUs() + + "memory=" + getMemory() + + '}'; + } + } + + /** + * Construct the TaskInfo needed to launch the worker. + * @param slaveId the assigned slave. + * @param assignment the assignment details. + * @return a fully-baked TaskInfo. + */ + @Override + public Protos.TaskInfo launch(Protos.SlaveID slaveId, TaskAssignmentResult assignment) { + + final Configuration dynamicProperties = new Configuration(); + + // specialize the TaskInfo template with assigned resources, environment variables, etc + final Protos.TaskInfo.Builder taskInfo = template + .clone() + .setSlaveId(slaveId) + .setTaskId(taskID) + .setName(taskID.getValue()) + .addResources(scalar("cpus", assignment.getRequest().getCPUs())) + .addResources(scalar("mem", assignment.getRequest().getMemory())); + //.addResources(scalar("disk", assignment.getRequest.getDisk).setRole("Flink")) + + // use the assigned ports for the TM + if (assignment.getAssignedPorts().size() != TM_PORT_KEYS.length) { + throw new IllegalArgumentException("unsufficient # of ports assigned"); + } + for (int i = 0; i < TM_PORT_KEYS.length; i++) { + int port = assignment.getAssignedPorts().get(i); + String key = TM_PORT_KEYS[i]; + taskInfo.addResources(ranges("ports", range(port, port))); + dynamicProperties.setInteger(key, port); + } + + // finalize environment variables + final Protos.Environment.Builder environmentBuilder = taskInfo.getCommandBuilder().getEnvironmentBuilder(); + + // propagate the Mesos task ID to the TM + environmentBuilder + .addVariables(variable(MesosConfigKeys.ENV_FLINK_CONTAINER_ID, taskInfo.getTaskId().getValue())); + + // propagate the dynamic configuration properties to the TM + String dynamicPropertiesEncoded = FlinkMesosSessionCli.encodeDynamicProperties(dynamicProperties); + environmentBuilder + .addVariables(variable(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded)); + + return taskInfo.build(); + } + + @Override + public String toString() { + return "LaunchableMesosWorker{" + + "taskID=" + taskID + + "taskRequest=" + taskRequest + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java new file mode 100644 index 0000000..30f2258 --- /dev/null +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java @@ -0,0 +1,600 @@ +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore; +import org.apache.flink.mesos.util.MesosArtifactServer; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.mesos.util.ZooKeeperUtils; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Option; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.net.InetAddress; +import java.net.URL; +import java.security.PrivilegedAction; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.mesos.Utils.uri; +import static org.apache.flink.mesos.Utils.variable; + +/** + * This class is the executable entry point for the Mesos Application Master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link MesosFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container + * allocation and failure detection. + */ +public class MesosApplicationMasterRunner { + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, + * before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final Map<String, String> ENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The exit code returned if the process exits because a critical actor died */ + private static final int ACTOR_DIED_EXIT_CODE = 32; + + // ------------------------------------------------------------------------ + // Program entry point + // ------------------------------------------------------------------------ + + /** + * The entry point for the Mesos AppMaster. + * + * @param args The command line arguments. + */ + public static void main(String[] args) { + EnvironmentInformation.logEnvironmentInfo(LOG, "Mesos AppMaster", args); + SignalHandler.register(LOG); + + // run and exit with the proper return code + int returnCode = new MesosApplicationMasterRunner().run(args); + System.exit(returnCode); + } + + /** + * The instance entry point for the Mesos AppMaster. Obtains user group + * information and calls the main work method {@link #runPrivileged()} as a + * privileged action. + * + * @param args The command line arguments. + * @return The process exit code. + */ + protected int run(String[] args) { + try { + LOG.debug("All environment variables: {}", ENV); + + final UserGroupInformation currentUser; + try { + currentUser = UserGroupInformation.getCurrentUser(); + } catch (Throwable t) { + throw new Exception("Cannot access UserGroupInformation information for current user", t); + } + + LOG.info("Running Flink as user {}", currentUser.getShortUserName()); + + // run the actual work in a secured privileged action + return currentUser.doAs(new PrivilegedAction<Integer>() { + @Override + public Integer run() { + return runPrivileged(); + } + }); + } + catch (Throwable t) { + // make sure that everything whatever ends up in the log + LOG.error("Mesos AppMaster initialization failed", t); + return INIT_ERROR_EXIT_CODE; + } + } + + // ------------------------------------------------------------------------ + // Core work method + // ------------------------------------------------------------------------ + + /** + * The main work method, must run as a privileged action. + * + * @return The return code for the Java process. + */ + protected int runPrivileged() { + + ActorSystem actorSystem = null; + WebMonitor webMonitor = null; + MesosArtifactServer artifactServer = null; + + try { + // ------- (1) load and parse / validate all configurations ------- + + // loading all config values here has the advantage that the program fails fast, if any + // configuration problem occurs + + final String workingDir = ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX); + require(workingDir != null, "Sandbox directory variable (%s) not set", MesosConfigKeys.ENV_MESOS_SANDBOX); + + final String sessionID = ENV.get(MesosConfigKeys.ENV_SESSION_ID); + require(sessionID != null, "Session ID (%s) not set", MesosConfigKeys.ENV_SESSION_ID); + + // Note that we use the "appMasterHostname" given by the system, to make sure + // we use the hostnames consistently throughout akka. + // for akka "localhost" and "localhost.localdomain" are different actors. + final String appMasterHostname = InetAddress.getLocalHost().getHostName(); + + // Flink configuration + final Configuration dynamicProperties = + FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES)); + LOG.debug("Mesos dynamic properties: {}", dynamicProperties); + + final Configuration config = createConfiguration(workingDir, dynamicProperties); + + // Mesos configuration + final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname); + + // environment values related to TM + final int taskManagerContainerMemory; + final int numInitialTaskManagers; + final int slotsPerTaskManager; + + try { + taskManagerContainerMemory = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_MEMORY)); + } catch (NumberFormatException e) { + throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_TM_MEMORY + " : " + + e.getMessage()); + } + try { + numInitialTaskManagers = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_COUNT)); + } catch (NumberFormatException e) { + throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_TM_COUNT + " : " + + e.getMessage()); + } + try { + slotsPerTaskManager = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_SLOTS)); + } catch (NumberFormatException e) { + throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_SLOTS + " : " + + e.getMessage()); + } + + final ContaineredTaskManagerParameters containeredParameters = + ContaineredTaskManagerParameters.create(config, taskManagerContainerMemory, slotsPerTaskManager); + + final MesosTaskManagerParameters taskManagerParameters = + MesosTaskManagerParameters.create(config, containeredParameters); + + LOG.info("TaskManagers will be created with {} task slots", + taskManagerParameters.containeredParameters().numSlots()); + LOG.info("TaskManagers will be started with container size {} MB, JVM heap size {} MB, " + + "JVM direct memory limit {} MB, {} cpus", + taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB(), + taskManagerParameters.containeredParameters().taskManagerHeapSizeMB(), + taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(), + taskManagerParameters.cpus()); + + // JM endpoint, which should be explicitly configured by the dispatcher (based on acquired net resources) + final int listeningPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, + ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT); + require(listeningPort >= 0 && listeningPort <= 65536, "Config parameter \"" + + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY + "\" is invalid, it must be between 0 and 65536"); + + // ----------------- (2) start the actor system ------------------- + + // try to start the actor system, JobManager and JobManager actor system + // using the configured address and ports + actorSystem = BootstrapTools.startActorSystem(config, appMasterHostname, listeningPort, LOG); + + final String akkaHostname = AkkaUtils.getAddress(actorSystem).host().get(); + final int akkaPort = (Integer) AkkaUtils.getAddress(actorSystem).port().get(); + + LOG.info("Actor system bound to hostname {}.", akkaHostname); + + // try to start the artifact server + LOG.debug("Starting Artifact Server"); + final int artifactServerPort = config.getInteger(ConfigConstants.MESOS_ARTIFACT_SERVER_PORT_KEY, + ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_PORT); + artifactServer = new MesosArtifactServer(sessionID, akkaHostname, artifactServerPort); + + // ----------------- (3) Generate the configuration for the TaskManagers ------------------- + + final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration( + config, akkaHostname, akkaPort, slotsPerTaskManager, TASKMANAGER_REGISTRATION_TIMEOUT); + LOG.debug("TaskManager configuration: {}", taskManagerConfig); + + final Protos.TaskInfo.Builder taskManagerContext = createTaskManagerContext( + config, mesosConfig, ENV, + taskManagerParameters, taskManagerConfig, + workingDir, getTaskManagerClass(), artifactServer, LOG); + + // ----------------- (4) start the actors ------------------- + + // 1) JobManager & Archive (in non-HA case, the leader service takes this) + // 2) Web Monitor (we need its port to register) + // 3) Resource Master for Mesos + // 4) Process reapers for the JobManager and Resource Master + + // 1: the JobManager + LOG.debug("Starting JobManager actor"); + + // we start the JobManager with its standard name + ActorRef jobManager = JobManager.startJobManagerActors( + config, actorSystem, + new scala.Some<>(JobManager.JOB_MANAGER_NAME()), + scala.Option.<String>empty(), + getJobManagerClass(), + getArchivistClass())._1(); + + + // 2: the web monitor + LOG.debug("Starting Web Frontend"); + + webMonitor = BootstrapTools.startWebMonitorIfConfigured(config, actorSystem, jobManager, LOG); + if(webMonitor != null) { + final URL webMonitorURL = new URL("http", appMasterHostname, webMonitor.getServerPort(), "/"); + mesosConfig.frameworkInfo().setWebuiUrl(webMonitorURL.toExternalForm()); + } + + // 3: Flink's Mesos ResourceManager + LOG.debug("Starting Mesos Flink Resource Manager"); + + // create the worker store to persist task information across restarts + MesosWorkerStore workerStore = createWorkerStore(config); + + // we need the leader retrieval service here to be informed of new + // leader session IDs, even though there can be only one leader ever + LeaderRetrievalService leaderRetriever = + LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager); + + Props resourceMasterProps = MesosFlinkResourceManager.createActorProps( + getResourceManagerClass(), + config, + mesosConfig, + workerStore, + leaderRetriever, + taskManagerParameters, + taskManagerContext, + numInitialTaskManagers, + LOG); + + ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps, "Mesos_Resource_Master"); + + + // 4: Process reapers + // The process reapers ensure that upon unexpected actor death, the process exits + // and does not stay lingering around unresponsive + + LOG.debug("Starting process reapers for JobManager"); + + actorSystem.actorOf( + Props.create(ProcessReaper.class, resourceMaster, LOG, ACTOR_DIED_EXIT_CODE), + "Mesos_Resource_Master_Process_Reaper"); + + actorSystem.actorOf( + Props.create(ProcessReaper.class, jobManager, LOG, ACTOR_DIED_EXIT_CODE), + "JobManager_Process_Reaper"); + } + catch (Throwable t) { + // make sure that everything whatever ends up in the log + LOG.error("Mesos JobManager initialization failed", t); + + if (actorSystem != null) { + try { + actorSystem.shutdown(); + } catch (Throwable tt) { + LOG.error("Error shutting down actor system", tt); + } + } + + if (webMonitor != null) { + try { + webMonitor.stop(); + } catch (Throwable ignored) { + LOG.warn("Failed to stop the web frontend", ignored); + } + } + + if(artifactServer != null) { + try { + artifactServer.stop(); + } catch (Throwable ignored) { + LOG.error("Failed to stop the artifact server", ignored); + } + } + + return INIT_ERROR_EXIT_CODE; + } + + // everything started, we can wait until all is done or the process is killed + LOG.info("Mesos JobManager started"); + + // wait until everything is done + actorSystem.awaitTermination(); + + // if we get here, everything work out jolly all right, and we even exited smoothly + if (webMonitor != null) { + try { + webMonitor.stop(); + } catch (Throwable t) { + LOG.error("Failed to stop the web frontend", t); + } + } + + try { + artifactServer.stop(); + } catch (Throwable t) { + LOG.error("Failed to stop the artifact server", t); + } + + return 0; + } + + // ------------------------------------------------------------------------ + // For testing, this allows to override the actor classes used for + // JobManager and the archive of completed jobs + // ------------------------------------------------------------------------ + + protected Class<? extends MesosFlinkResourceManager> getResourceManagerClass() { + return MesosFlinkResourceManager.class; + } + + protected Class<? extends JobManager> getJobManagerClass() { + return MesosJobManager.class; + } + + protected Class<? extends MemoryArchivist> getArchivistClass() { + return MemoryArchivist.class; + } + + protected Class<? extends TaskManager> getTaskManagerClass() { + return MesosTaskManager.class; + } + + /** + * Validates a condition, throwing a RuntimeException if the condition is violated. + * + * @param condition The condition. + * @param message The message for the runtime exception, with format variables as defined by + * {@link String#format(String, Object...)}. + * @param values The format arguments. + */ + private static void require(boolean condition, String message, Object... values) { + if (!condition) { + throw new RuntimeException(String.format(message, values)); + } + } + + /** + * + * @param baseDirectory + * @param additional + * + * @return The configuration to be used by the TaskManagers. + */ + @SuppressWarnings("deprecation") + private static Configuration createConfiguration(String baseDirectory, Configuration additional) { + LOG.info("Loading config from directory " + baseDirectory); + + Configuration configuration = GlobalConfiguration.loadConfiguration(baseDirectory); + + configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, baseDirectory); + + // add dynamic properties to JobManager configuration. + configuration.addAll(additional); + + return configuration; + } + + /** + * Loads and validates the ResourceManager Mesos configuration from the given Flink configuration. + */ + public static MesosConfiguration createMesosConfig(Configuration flinkConfig, String hostname) { + + Protos.FrameworkInfo.Builder frameworkInfo = Protos.FrameworkInfo.newBuilder() + .setUser("") + .setHostname(hostname); + Protos.Credential.Builder credential = null; + + if(!flinkConfig.containsKey(ConfigConstants.MESOS_MASTER_URL)) { + throw new IllegalConfigurationException(ConfigConstants.MESOS_MASTER_URL + " must be configured."); + } + String masterUrl = flinkConfig.getString(ConfigConstants.MESOS_MASTER_URL, null); + + Duration failoverTimeout = FiniteDuration.apply( + flinkConfig.getInteger( + ConfigConstants.MESOS_FAILOVER_TIMEOUT_SECONDS, + ConfigConstants.DEFAULT_MESOS_FAILOVER_TIMEOUT_SECS), + TimeUnit.SECONDS); + frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds()); + + frameworkInfo.setName(flinkConfig.getString( + ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_NAME, + ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_NAME)); + + frameworkInfo.setRole(flinkConfig.getString( + ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE, + ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE)); + + if(flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) { + frameworkInfo.setPrincipal(flinkConfig.getString( + ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL, null)); + + credential = Protos.Credential.newBuilder(); + credential.setPrincipal(frameworkInfo.getPrincipal()); + + if(!flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET)) { + throw new IllegalConfigurationException(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET + " must be configured."); + } + credential.setSecret(flinkConfig.getString( + ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET, null)); + } + + MesosConfiguration mesos = + new MesosConfiguration(masterUrl, frameworkInfo, Option.apply(credential)); + + return mesos; + } + + private MesosWorkerStore createWorkerStore(Configuration flinkConfig) throws Exception { + MesosWorkerStore workerStore; + RecoveryMode recoveryMode = RecoveryMode.fromConfig(flinkConfig); + if (recoveryMode == RecoveryMode.STANDALONE) { + workerStore = new StandaloneMesosWorkerStore(); + } + else if (recoveryMode == RecoveryMode.ZOOKEEPER) { + // note: the store is responsible for closing the client. + CuratorFramework client = ZooKeeperUtils.startCuratorFramework(flinkConfig); + workerStore = ZooKeeperMesosWorkerStore.createMesosWorkerStore(client, flinkConfig); + } + else { + throw new IllegalConfigurationException("Unexpected recovery mode '" + recoveryMode + "."); + } + + return workerStore; + } + + /** + * Creates a Mesos task info template, which describes how to bring up a TaskManager process as + * a Mesos task. + * + * <p>This code is extremely Mesos-specific and registers all the artifacts that the TaskManager + * needs (such as JAR file, config file, ...) and all environment variables in a task info record. + * The Mesos fetcher then ensures that those artifacts will be copied into the task's sandbox directory. + * A lightweight HTTP server serves the artifacts to the fetcher. + * + * <p>We do this work before we start the ResourceManager actor in order to fail early if + * any of the operations here fail. + * + * @param flinkConfig + * The Flink configuration object. + * @param mesosConfig + * The Mesos configuration object. + * @param env + * The environment variables. + * @param tmParams + * The TaskManager container memory parameters. + * @param taskManagerConfig + * The configuration for the TaskManagers. + * @param workingDirectory + * The current application master container's working directory. + * @param taskManagerMainClass + * The class with the main method. + * @param artifactServer + * The artifact server. + * @param log + * The logger. + * + * @return The task info template for the TaskManager processes. + * + * @throws Exception Thrown if the task info could not be created, for example if + * the resources could not be copied. + */ + public static Protos.TaskInfo.Builder createTaskManagerContext( + Configuration flinkConfig, + MesosConfiguration mesosConfig, + Map<String, String> env, + MesosTaskManagerParameters tmParams, + Configuration taskManagerConfig, + String workingDirectory, + Class<?> taskManagerMainClass, + MesosArtifactServer artifactServer, + Logger log) throws Exception { + + + Protos.TaskInfo.Builder info = Protos.TaskInfo.newBuilder(); + Protos.CommandInfo.Builder cmd = Protos.CommandInfo.newBuilder(); + + log.info("Setting up artifacts for TaskManagers"); + + String shipListString = env.get(MesosConfigKeys.ENV_CLIENT_SHIP_FILES); + require(shipListString != null, "Environment variable %s not set", MesosConfigKeys.ENV_CLIENT_SHIP_FILES); + + String clientUsername = env.get(MesosConfigKeys.ENV_CLIENT_USERNAME); + require(clientUsername != null, "Environment variable %s not set", MesosConfigKeys.ENV_CLIENT_USERNAME); + + String classPathString = env.get(MesosConfigKeys.ENV_FLINK_CLASSPATH); + require(classPathString != null, "Environment variable %s not set", MesosConfigKeys.ENV_FLINK_CLASSPATH); + + // register the Flink jar + final File flinkJarFile = new File(workingDirectory, "flink.jar"); + cmd.addUris(uri(artifactServer.addFile(flinkJarFile, "flink.jar"), true)); + + // register the TaskManager configuration + final File taskManagerConfigFile = + new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml"); + LOG.debug("Writing TaskManager configuration to {}", taskManagerConfigFile.getAbsolutePath()); + BootstrapTools.writeConfiguration(taskManagerConfig, taskManagerConfigFile); + cmd.addUris(uri(artifactServer.addFile(taskManagerConfigFile, GlobalConfiguration.FLINK_CONF_FILENAME), true)); + + // prepare additional files to be shipped + for (String pathStr : shipListString.split(",")) { + if (!pathStr.isEmpty()) { + File shipFile = new File(workingDirectory, pathStr); + cmd.addUris(uri(artifactServer.addFile(shipFile, shipFile.getName()), true)); + } + } + + log.info("Creating task info for TaskManagers"); + + // build the launch command + boolean hasLogback = new File(workingDirectory, "logback.xml").exists(); + boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists(); + + String launchCommand = BootstrapTools.getTaskManagerShellCommand( + flinkConfig, tmParams.containeredParameters(), ".", ".", + hasLogback, hasLog4j, taskManagerMainClass); + cmd.setValue(launchCommand); + + // build the environment variables + Protos.Environment.Builder envBuilder = Protos.Environment.newBuilder(); + for (Map.Entry<String, String> entry : tmParams.containeredParameters().taskManagerEnv().entrySet()) { + envBuilder.addVariables(variable(entry.getKey(), entry.getValue())); + } + envBuilder.addVariables(variable(MesosConfigKeys.ENV_CLASSPATH, classPathString)); + envBuilder.addVariables(variable(MesosConfigKeys.ENV_CLIENT_USERNAME, clientUsername)); + + cmd.setEnvironment(envBuilder); + + info.setCommand(cmd); + + return info; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java new file mode 100644 index 0000000..3173286 --- /dev/null +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java @@ -0,0 +1,26 @@ +package org.apache.flink.mesos.runtime.clusterframework; + +/** + * The Mesos environment variables used for settings of the containers. + */ +public class MesosConfigKeys { + // ------------------------------------------------------------------------ + // Environment variable names + // ------------------------------------------------------------------------ + + public static final String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY"; + public static final String ENV_TM_COUNT = "_CLIENT_TM_COUNT"; + public static final String ENV_SLOTS = "_SLOTS"; + public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES"; + public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME"; + public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES"; + public static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID"; + public static final String ENV_FLINK_TMP_DIR = "_FLINK_TMP_DIR"; + public static final String ENV_FLINK_CLASSPATH = "_FLINK_CLASSPATH"; + public static final String ENV_CLASSPATH = "CLASSPATH"; + public static final String ENV_MESOS_SANDBOX = "MESOS_SANDBOX"; + public static final String ENV_SESSION_ID = "_CLIENT_SESSION_ID"; + + /** Private constructor to prevent instantiation */ + private MesosConfigKeys() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java new file mode 100644 index 0000000..483c7b7 --- /dev/null +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java @@ -0,0 +1,737 @@ +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMesosWorkerNode> { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef taskRouter; + + private ActorRef launchCoordinator; + + private ActorRef reconciliationCoordinator; + + private MesosWorkerStore workerStore; + + final Map<ResourceID, MesosWorkerStore.Worker> workersInNew; + final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch; + final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned; + + /** The number of failed tasks since the master became active */ + private int failedTasksSoFar; + + public MesosFlinkResourceManager( + Configuration flinkConfig, + MesosConfiguration mesosConfig, + MesosWorkerStore workerStore, + LeaderRetrievalService leaderRetrievalService, + MesosTaskManagerParameters taskManagerParameters, + Protos.TaskInfo.Builder taskManagerLaunchContext, + int maxFailedTasks, + int numInitialTaskManagers) { + + super(numInitialTaskManagers, flinkConfig, leaderRetrievalService); + + this.mesosConfig = requireNonNull(mesosConfig); + + this.workerStore = requireNonNull(workerStore); + + this.taskManagerParameters = requireNonNull(taskManagerParameters); + this.taskManagerLaunchContext = requireNonNull(taskManagerLaunchContext); + this.maxFailedTasks = maxFailedTasks; + + this.workersInNew = new HashMap<>(); + this.workersInLaunch = new HashMap<>(); + this.workersBeingReturned = new HashMap<>(); + } + + // ------------------------------------------------------------------------ + // Mesos-specific behavior + // ------------------------------------------------------------------------ + + @Override + protected void initialize() throws Exception { + LOG.info("Initializing Mesos resource master"); + + workerStore.start(); + + // create the scheduler driver to communicate with Mesos + schedulerCallbackHandler = new SchedulerProxy(self()); + + // register with Mesos + FrameworkInfo.Builder frameworkInfo = mesosConfig.frameworkInfo() + .clone() + .setCheckpoint(true); + + Option<Protos.FrameworkID> frameworkID = workerStore.getFrameworkID(); + if(frameworkID.isEmpty()) { + LOG.info("Registering as new framework."); + } + else { + LOG.info("Recovery scenario: re-registering using framework ID {}.", frameworkID.get().getValue()); + frameworkInfo.setId(frameworkID.get()); + } + + MesosConfiguration initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo); + MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig); + schedulerDriver = initializedMesosConfig.createDriver(schedulerCallbackHandler, false); + + // create supporting actors + connectionMonitor = createConnectionMonitor(); + launchCoordinator = createLaunchCoordinator(); + reconciliationCoordinator = createReconciliationCoordinator(); + taskRouter = createTaskRouter(); + + recoverWorkers(); + + connectionMonitor.tell(new ConnectionMonitor.Start(), self()); + schedulerDriver.start(); + } + + protected ActorRef createConnectionMonitor() { + return context().actorOf( + ConnectionMonitor.createActorProps(ConnectionMonitor.class, config), + "connectionMonitor"); + } + + protected ActorRef createTaskRouter() { + return context().actorOf( + Tasks.createActorProps(Tasks.class, config, schedulerDriver, TaskMonitor.class), + "tasks"); + } + + protected ActorRef createLaunchCoordinator() { + return context().actorOf( + LaunchCoordinator.createActorProps(LaunchCoordinator.class, self(), config, schedulerDriver, createOptimizer()), + "launchCoordinator"); + } + + protected ActorRef createReconciliationCoordinator() { + return context().actorOf( + ReconciliationCoordinator.createActorProps(ReconciliationCoordinator.class, config, schedulerDriver), + "reconciliationCoordinator"); + } + + @Override + public void postStop() { + LOG.info("Stopping Mesos resource master"); + super.postStop(); + } + + // ------------------------------------------------------------------------ + // Actor messages + // ------------------------------------------------------------------------ + + @Override + protected void handleMessage(Object message) { + + // check for Mesos-specific actor messages first + + // --- messages about Mesos connection + if (message instanceof Registered) { + registered((Registered) message); + } else if (message instanceof ReRegistered) { + reregistered((ReRegistered) message); + } else if (message instanceof Disconnected) { + disconnected((Disconnected) message); + } else if (message instanceof Error) { + error(((Error) message).message()); + + // --- messages about offers + } else if (message instanceof ResourceOffers || message instanceof OfferRescinded) { + launchCoordinator.tell(message, self()); + } else if (message instanceof AcceptOffers) { + acceptOffers((AcceptOffers) message); + + // --- messages about tasks + } else if (message instanceof StatusUpdate) { + taskStatusUpdated((StatusUpdate) message); + } else if (message instanceof ReconciliationCoordinator.Reconcile) { + // a reconciliation request from a task + reconciliationCoordinator.tell(message, self()); + } else if (message instanceof TaskMonitor.TaskTerminated) { + // a termination message from a task + TaskMonitor.TaskTerminated msg = (TaskMonitor.TaskTerminated) message; + taskTerminated(msg.taskID(), msg.status()); + + } else { + // message handled by the generic resource master code + super.handleMessage(message); + } + } + + /** + * Called to shut down the cluster (not a failover situation). + * + * @param finalStatus The application status to report. + * @param optionalDiagnostics An optional diagnostics message. + */ + @Override + protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) { + + LOG.info("Shutting down and unregistering as a Mesos framework."); + try { + // unregister the framework, which implicitly removes all tasks. + schedulerDriver.stop(false); + } + catch(Exception ex) { + LOG.warn("unable to unregister the framework", ex); + } + + try { + workerStore.cleanup(); + } + catch(Exception ex) { + LOG.warn("unable to cleanup the ZooKeeper state", ex); + } + + context().stop(self()); + } + + @Override + protected void fatalError(String message, Throwable error) { + // we do not unregister, but cause a hard fail of this process, to have it + // restarted by the dispatcher + LOG.error("FATAL ERROR IN MESOS APPLICATION MASTER: " + message, error); + LOG.error("Shutting down process"); + + // kill this process, this will make an external supervisor (the dispatcher) restart the process + System.exit(EXIT_CODE_FATAL_ERROR); + } + + // ------------------------------------------------------------------------ + // Worker Management + // ------------------------------------------------------------------------ + + /** + * Recover framework/worker information persisted by a prior incarnation of the RM. + */ + private void recoverWorkers() throws Exception { + // if this application master starts as part of an ApplicationMaster/JobManager recovery, + // then some worker tasks are most likely still alive and we can re-obtain them + final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = workerStore.recoverWorkers(); + + if (!tasksFromPreviousAttempts.isEmpty()) { + LOG.info("Retrieved {} TaskManagers from previous attempt", tasksFromPreviousAttempts.size()); + + List<Tuple2<TaskRequest,String>> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size()); + List<LaunchableTask> toLaunch = new ArrayList<>(tasksFromPreviousAttempts.size()); + + for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) { + LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID()); + + switch(worker.state()) { + case New: + workersInNew.put(extractResourceID(worker.taskID()), worker); + toLaunch.add(launchable); + break; + case Launched: + workersInLaunch.put(extractResourceID(worker.taskID()), worker); + toAssign.add(new Tuple2<>(launchable.taskRequest(), worker.hostname().get())); + break; + case Released: + workersBeingReturned.put(extractResourceID(worker.taskID()), worker); + break; + } + taskRouter.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self()); + } + + // tell the launch coordinator about prior assignments + if(toAssign.size() >= 1) { + launchCoordinator.tell(new LaunchCoordinator.Assign(toAssign), self()); + } + // tell the launch coordinator to launch any new tasks + if(toLaunch.size() >= 1) { + launchCoordinator.tell(new LaunchCoordinator.Launch(toLaunch), self()); + } + } + } + + /** + * Plan for some additional workers to be launched. + * + * @param numWorkers The number of workers to allocate. + */ + @Override + protected void requestNewWorkers(int numWorkers) { + + try { + List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new ArrayList<>(numWorkers); + List<LaunchableTask> toLaunch = new ArrayList<>(numWorkers); + + // generate new workers into persistent state and launch associated actors + for (int i = 0; i < numWorkers; i++) { + MesosWorkerStore.Worker worker = MesosWorkerStore.Worker.newTask(workerStore.newTaskID()); + workerStore.putWorker(worker); + workersInNew.put(extractResourceID(worker.taskID()), worker); + + LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID()); + + LOG.info("Scheduling Mesos task {} with ({} MB, {} cpus).", + launchable.taskID().getValue(), launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs()); + + toMonitor.add(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker))); + toLaunch.add(launchable); + } + + // tell the task router about the new plans + for (TaskMonitor.TaskGoalStateUpdated update : toMonitor) { + taskRouter.tell(update, self()); + } + + // tell the launch coordinator to launch the new tasks + if(toLaunch.size() >= 1) { + launchCoordinator.tell(new LaunchCoordinator.Launch(toLaunch), self()); + } + } + catch(Exception ex) { + fatalError("unable to request new workers", ex); + } + } + + /** + * Accept offers as advised by the launch coordinator. + * + * Acceptance is routed through the RM to update the persistent state before + * forwarding the message to Mesos. + */ + private void acceptOffers(AcceptOffers msg) { + + try { + List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new ArrayList<>(msg.operations().size()); + + // transition the persistent state of some tasks to Launched + for (Protos.Offer.Operation op : msg.operations()) { + if (op.getType() != Protos.Offer.Operation.Type.LAUNCH) { + continue; + } + for (Protos.TaskInfo info : op.getLaunch().getTaskInfosList()) { + MesosWorkerStore.Worker worker = workersInNew.remove(extractResourceID(info.getTaskId())); + assert (worker != null); + + worker = worker.launchTask(info.getSlaveId(), msg.hostname()); + workerStore.putWorker(worker); + workersInLaunch.put(extractResourceID(worker.taskID()), worker); + + LOG.info("Launching Mesos task {} on host {}.", + worker.taskID().getValue(), worker.hostname().get()); + + toMonitor.add(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker))); + } + } + + // tell the task router about the new plans + for (TaskMonitor.TaskGoalStateUpdated update : toMonitor) { + taskRouter.tell(update, self()); + } + + // send the acceptance message to Mesos + schedulerDriver.acceptOffers(msg.offerIds(), msg.operations(), msg.filters()); + } + catch(Exception ex) { + fatalError("unable to accept offers", ex); + } + } + + /** + * Handle a task status change. + */ + private void taskStatusUpdated(StatusUpdate message) { + taskRouter.tell(message, self()); + reconciliationCoordinator.tell(message, self()); + schedulerDriver.acknowledgeStatusUpdate(message.status()); + } + + /** + * Accept the given started worker into the internal state. + * + * @param resourceID The worker resource id + * @return A registered worker node record. + */ + @Override + protected RegisteredMesosWorkerNode workerStarted(ResourceID resourceID) { + MesosWorkerStore.Worker inLaunch = workersInLaunch.remove(resourceID); + if (inLaunch == null) { + // Worker was not in state "being launched", this can indicate that the TaskManager + // in this worker was already registered or that the container was not started + // by this resource manager. Simply ignore this resourceID. + return null; + } + return new RegisteredMesosWorkerNode(inLaunch); + } + + /** + * Accept the given registered workers into the internal state. + * + * @param toConsolidate The worker IDs known previously to the JobManager. + * @return A collection of registered worker node records. + */ + @Override + protected Collection<RegisteredMesosWorkerNode> reacceptRegisteredWorkers(Collection<ResourceID> toConsolidate) { + + // we check for each task manager if we recognize its Mesos task ID + List<RegisteredMesosWorkerNode> accepted = new ArrayList<>(toConsolidate.size()); + for (ResourceID resourceID : toConsolidate) { + MesosWorkerStore.Worker worker = workersInLaunch.remove(resourceID); + if (worker != null) { + LOG.info("Mesos worker consolidation recognizes TaskManager {}.", resourceID); + accepted.add(new RegisteredMesosWorkerNode(worker)); + } + else { + if(isStarted(resourceID)) { + LOG.info("TaskManager {} has already been registered at the resource manager.", resourceID); + } + else { + LOG.info("Mesos worker consolidation does not recognize TaskManager {}.", resourceID); + } + } + } + return accepted; + } + + /** + * Release the given pending worker. + */ + @Override + protected void releasePendingWorker(ResourceID id) { + MesosWorkerStore.Worker worker = workersInLaunch.remove(id); + if (worker != null) { + releaseWorker(worker); + } else { + LOG.error("Cannot find worker {} to release. Ignoring request.", id); + } + } + + /** + * Release the given started worker. + */ + @Override + protected void releaseStartedWorker(RegisteredMesosWorkerNode worker) { + releaseWorker(worker.task()); + } + + /** + * Plan for the removal of the given worker. + */ + private void releaseWorker(MesosWorkerStore.Worker worker) { + try { + LOG.info("Releasing worker {}", worker.taskID()); + + // update persistent state of worker to Released + worker = worker.releaseTask(); + workerStore.putWorker(worker); + workersBeingReturned.put(extractResourceID(worker.taskID()), worker); + taskRouter.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self()); + + if (worker.hostname().isDefined()) { + // tell the launch coordinator that the task is being unassigned from the host, for planning purposes + launchCoordinator.tell(new LaunchCoordinator.Unassign(worker.taskID(), worker.hostname().get()), self()); + } + } + catch (Exception ex) { + fatalError("unable to release worker", ex); + } + } + + @Override + protected int getNumWorkerRequestsPending() { + return workersInNew.size(); + } + + @Override + protected int getNumWorkersPendingRegistration() { + return workersInLaunch.size(); + } + + // ------------------------------------------------------------------------ + // Callbacks from the Mesos Master + // ------------------------------------------------------------------------ + + /** + * Called when connected to Mesos as a new framework. + */ + private void registered(Registered message) { + connectionMonitor.tell(message, self()); + + try { + workerStore.setFrameworkID(Option.apply(message.frameworkId())); + } + catch(Exception ex) { + fatalError("unable to store the assigned framework ID", ex); + return; + } + + launchCoordinator.tell(message, self()); + reconciliationCoordinator.tell(message, self()); + taskRouter.tell(message, self()); + } + + /** + * Called when reconnected to Mesos following a failover event. + */ + private void reregistered(ReRegistered message) { + connectionMonitor.tell(message, self()); + launchCoordinator.tell(message, self()); + reconciliationCoordinator.tell(message, self()); + taskRouter.tell(message, self()); + } + + /** + * Called when disconnected from Mesos. + */ + private void disconnected(Disconnected message) { + connectionMonitor.tell(message, self()); + launchCoordinator.tell(message, self()); + reconciliationCoordinator.tell(message, self()); + taskRouter.tell(message, self()); + } + + /** + * Called when an error is reported by the scheduler callback. + */ + private void error(String message) { + self().tell(new FatalErrorOccurred("Connection to Mesos failed", new Exception(message)), self()); + } + + /** + * Invoked when a Mesos task reaches a terminal status. + */ + private void taskTerminated(Protos.TaskID taskID, Protos.TaskStatus status) { + // this callback occurs for failed containers and for released containers alike + + final ResourceID id = extractResourceID(taskID); + + try { + workerStore.removeWorker(taskID); + } + catch(Exception ex) { + fatalError("unable to remove worker", ex); + return; + } + + // check if this is a failed task or a released task + if (workersBeingReturned.remove(id) != null) { + // regular finished worker that we released + LOG.info("Worker {} finished successfully with diagnostics: {}", + id, status.getMessage()); + } else { + // failed worker, either at startup, or running + final MesosWorkerStore.Worker launched = workersInLaunch.remove(id); + if (launched != null) { + LOG.info("Mesos task {} failed, with a TaskManager in launch or registration. " + + "State: {} Reason: {} ({})", id, status.getState(), status.getReason(), status.getMessage()); + // we will trigger re-acquiring new workers at the end + } else { + // failed registered worker + LOG.info("Mesos task {} failed, with a registered TaskManager. " + + "State: {} Reason: {} ({})", id, status.getState(), status.getReason(), status.getMessage()); + + // notify the generic logic, which notifies the JobManager, etc. + notifyWorkerFailed(id, "Mesos task " + id + " failed. State: " + status.getState()); + } + + // general failure logging + failedTasksSoFar++; + + String diagMessage = String.format("Diagnostics for task %s in state %s : " + + "reason=%s message=%s", + id, status.getState(), status.getReason(), status.getMessage()); + sendInfoMessage(diagMessage); + + LOG.info(diagMessage); + LOG.info("Total number of failed tasks so far: " + failedTasksSoFar); + + // maxFailedTasks == -1 is infinite number of retries. + if (maxFailedTasks >= 0 && failedTasksSoFar > maxFailedTasks) { + String msg = "Stopping Mesos session because the number of failed tasks (" + + failedTasksSoFar + ") exceeded the maximum failed tasks (" + + maxFailedTasks + "). This number is controlled by the '" + + ConfigConstants.MESOS_MAX_FAILED_TASKS + "' configuration setting. " + + "By default its the number of requested tasks."; + + LOG.error(msg); + self().tell(decorateMessage(new StopCluster(ApplicationStatus.FAILED, msg)), + ActorRef.noSender()); + + // no need to do anything else + return; + } + } + + // in case failed containers were among the finished containers, make + // sure we re-examine and request new ones + triggerCheckWorkers(); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) { + LaunchableMesosWorker launchable = + new LaunchableMesosWorker(taskManagerParameters, taskManagerLaunchContext, taskID); + return launchable; + } + + /** + * Extracts a unique ResourceID from the Mesos task. + * + * @param taskId the Mesos TaskID + * @return The ResourceID for the container + */ + static ResourceID extractResourceID(Protos.TaskID taskId) { + return new ResourceID(taskId.getValue()); + } + + /** + * Extracts the Mesos task goal state from the worker information. + * @param worker the persistent worker information. + * @return goal state information for the {@Link TaskMonitor}. + */ + static TaskMonitor.TaskGoalState extractGoalState(MesosWorkerStore.Worker worker) { + switch(worker.state()) { + case New: return new TaskMonitor.New(worker.taskID()); + case Launched: return new TaskMonitor.Launched(worker.taskID(), worker.slaveID().get()); + case Released: return new TaskMonitor.Released(worker.taskID(), worker.slaveID().get()); + default: throw new IllegalArgumentException(); + } + } + + /** + * Creates the Fenzo optimizer (builder). + * The builder is an indirection to faciliate unit testing of the Launch Coordinator. + */ + private static TaskSchedulerBuilder createOptimizer() { + return new TaskSchedulerBuilder() { + TaskScheduler.Builder builder = new TaskScheduler.Builder(); + + @Override + public TaskSchedulerBuilder withLeaseRejectAction(Action1<VirtualMachineLease> action) { + builder.withLeaseRejectAction(action); + return this; + } + + @Override + public TaskScheduler build() { + return builder.build(); + } + }; + } + + /** + * Creates the props needed to instantiate this actor. + * + * Rather than extracting and validating parameters in the constructor, this factory method takes + * care of that. That way, errors occur synchronously, and are not swallowed simply in a + * failed asynchronous attempt to start the actor. + + * @param actorClass + * The actor class, to allow overriding this actor with subclasses for testing. + * @param flinkConfig + * The Flink configuration object. + * @param taskManagerParameters + * The parameters for launching TaskManager containers. + * @param taskManagerLaunchContext + * The parameters for launching the TaskManager processes in the TaskManager containers. + * @param numInitialTaskManagers + * The initial number of TaskManagers to allocate. + * @param log + * The logger to log to. + * + * @return The Props object to instantiate the MesosFlinkResourceManager actor. + */ + public static Props createActorProps(Class<? extends MesosFlinkResourceManager> actorClass, + Configuration flinkConfig, + MesosConfiguration mesosConfig, + MesosWorkerStore workerStore, + LeaderRetrievalService leaderRetrievalService, + MesosTaskManagerParameters taskManagerParameters, + Protos.TaskInfo.Builder taskManagerLaunchContext, + int numInitialTaskManagers, + Logger log) + { + final int maxFailedTasks = flinkConfig.getInteger( + ConfigConstants.MESOS_MAX_FAILED_TASKS, numInitialTaskManagers); + if (maxFailedTasks >= 0) { + log.info("Mesos framework tolerates {} failed tasks before giving up", + maxFailedTasks); + } + + return Props.create(actorClass, + flinkConfig, + mesosConfig, + workerStore, + leaderRetrievalService, + taskManagerParameters, + taskManagerLaunchContext, + maxFailedTasks, + numInitialTaskManagers); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java new file mode 100644 index 0000000..b3956aa --- /dev/null +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java @@ -0,0 +1,51 @@ +package org.apache.flink.mesos.runtime.clusterframework; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; + +import static java.util.Objects.requireNonNull; + +public class MesosTaskManagerParameters { + + private double cpus; + + private ContaineredTaskManagerParameters containeredParameters; + + public MesosTaskManagerParameters(double cpus, ContaineredTaskManagerParameters containeredParameters) { + requireNonNull(containeredParameters); + this.cpus = cpus; + this.containeredParameters = containeredParameters; + } + + public double cpus() { + return cpus; + } + + public ContaineredTaskManagerParameters containeredParameters() { + return containeredParameters; + } + + @Override + public String toString() { + return "MesosTaskManagerParameters{" + + "cpus=" + cpus + + ", containeredParameters=" + containeredParameters + + '}'; + } + + /** + * Create the Mesos TaskManager parameters. + * @param flinkConfig the TM configuration. + * @param containeredParameters additional containered parameters. + */ + public static MesosTaskManagerParameters create( + Configuration flinkConfig, + ContaineredTaskManagerParameters containeredParameters) { + + double cpus = flinkConfig.getDouble(ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CPUS, + Math.max(containeredParameters.numSlots(), 1.0)); + + return new MesosTaskManagerParameters(cpus, containeredParameters); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d9b2be05/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java new file mode 100644 index 0000000..5dfc75e --- /dev/null +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java @@ -0,0 +1,99 @@ +package org.apache.flink.mesos.runtime.clusterframework; + +import java.io.IOException; +import java.security.PrivilegedAction; +import java.util.Map; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; + +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The entry point for running a TaskManager in a Mesos container. + */ +public class MesosTaskManagerRunner { + + private static final Logger LOG = LoggerFactory.getLogger(MesosTaskManagerRunner.class); + + /** The process environment variables */ + private static final Map<String, String> ENV = System.getenv(); + + public static void runTaskManager(String[] args, final Class<? extends TaskManager> taskManager) throws IOException { + EnvironmentInformation.logEnvironmentInfo(LOG, taskManager.getSimpleName(), args); + org.apache.flink.runtime.util.SignalHandler.register(LOG); + + // try to parse the command line arguments + final Configuration configuration; + try { + configuration = TaskManager.parseArgsAndLoadConfig(args); + + // add dynamic properties to TaskManager configuration. + final Configuration dynamicProperties = + FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES)); + LOG.debug("Mesos dynamic properties: {}", dynamicProperties); + configuration.addAll(dynamicProperties); + } + catch (Throwable t) { + LOG.error(t.getMessage(), t); + System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE()); + return; + } + + // read the environment variables + final Map<String, String> envs = System.getenv(); + final String effectiveUsername = envs.get(MesosConfigKeys.ENV_CLIENT_USERNAME); + final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR); + + // configure local directory + String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null); + if (flinkTempDirs != null) { + LOG.info("Overriding Mesos temporary file directories with those " + + "specified in the Flink config: " + flinkTempDirs); + } + else if (tmpDirs != null) { + LOG.info("Setting directories for temporary files to: " + tmpDirs); + configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tmpDirs); + } + + LOG.info("Mesos task runs as '" + UserGroupInformation.getCurrentUser().getShortUserName() + + "', setting user to execute Flink TaskManager to '" + effectiveUsername + "'"); + + // tell akka to die in case of an error + configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true); + + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(effectiveUsername); + for (Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) { + ugi.addToken(toks); + } + + // Infer the resource identifier from the environment variable + String containerID = Preconditions.checkNotNull(envs.get(MesosConfigKeys.ENV_FLINK_CONTAINER_ID)); + final ResourceID resourceId = new ResourceID(containerID); + LOG.info("ResourceID assigned for this container: {}", resourceId); + + ugi.doAs(new PrivilegedAction<Object>() { + @Override + public Object run() { + try { + TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager); + } + catch (Throwable t) { + LOG.error("Error while starting the TaskManager", t); + System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE()); + } + return null; + } + }); + } +}