Repository: flink
Updated Branches:
  refs/heads/master fcb13e1f5 -> 8ef6926f3


[FLINK-5975] Add volume support to flink-mesos

When using containerization, specifically, docker, it is useful to be
able to attach additional volumes, such as an NFS share.

This adds support for volumes to be attached via specifying a new config
values `mesos.resourcemanager.tasks.container.volumes`. This is comma
delimited string of `[host_path:]container_path[:RO|RW]`.

It is modeled after the spark mesos framework

[FLINK-5975] Address code review, simplify handling of modes

[FLINK-5975] Fix volume config key to match actual

[FLINK-5975] make building volume info eager

[FLINK-5975] add volume info before setting container info

[FLINK-5975] Always set volumes even in no image

In the event that no container is used, we should still be able to use
volumes with the mesos containerizer.

This closes #3481.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4ef9672a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4ef9672a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4ef9672a

Branch: refs/heads/master
Commit: 4ef9672a9da78bc4ce02e56d9ecdcc546da23e42
Parents: fcb13e1
Author: Addison Higham <ahig...@instructure.com>
Authored: Mon Mar 6 23:40:16 2017 -0700
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Fri Apr 28 09:24:12 2017 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            |   8 +-
 docs/setup/mesos.md                             |   2 +
 .../clusterframework/LaunchableMesosWorker.java |  46 +++++----
 .../MesosTaskManagerParameters.java             | 101 +++++++++++++++++--
 .../MesosFlinkResourceManagerTest.java          |   8 +-
 .../MesosTaskManagerParametersTest.java         |  77 ++++++++++++++
 6 files changed, 205 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4ef9672a/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index e36f149..d8a7082 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -152,7 +152,7 @@ Below is a list of currently first-class supported 
connectors or components by F
 
 - Zookeeper (for HA): see 
[here]({{site.baseurl}}/setup/jobmanager_high_availability.html#configuring-for-zookeeper-security)
 for details on Zookeeper security configuration to work with the 
Kerberos-based security configurations mentioned here.
 
-For more information on how Flink security internally setups Kerberos 
authentication, please see [here]({{site.baseurl}}/ops/security-kerberos.html). 
+For more information on how Flink security internally setups Kerberos 
authentication, please see [here]({{site.baseurl}}/ops/security-kerberos.html).
 
 ### Other
 
@@ -196,7 +196,7 @@ will be used under the directory specified by 
jobmanager.web.tmpdir.
 
 - `blob.service.ssl.enabled`: Flag to enable ssl for the blob client/server 
communication. This is applicable only when the global ssl flag 
security.ssl.enabled is set to true (DEFAULT: true).
 
-- `restart-strategy`: Default [restart 
strategy]({{site.baseurl}}/dev/restart_strategies.html) to use in case no 
+- `restart-strategy`: Default [restart 
strategy]({{site.baseurl}}/dev/restart_strategies.html) to use in case no
 restart strategy has been specified for the job.
 The options are:
     - fixed delay strategy: `fixed-delay`.
@@ -209,7 +209,7 @@ The options are:
 Default value is 1, unless "fixed-delay" was activated by enabling 
checkpoints, in which case the default is `Integer.MAX_VALUE`.
 
 - `restart-strategy.fixed-delay.delay`: Delay between restart attempts, used 
if the default restart strategy is set to "fixed-delay".
-Default value is the `akka.ask.timeout`, unless "fixed-delay" was activated by 
enabling checkpoints, in which case 
+Default value is the `akka.ask.timeout`, unless "fixed-delay" was activated by 
enabling checkpoints, in which case
 the default is 10s.
 
 - `restart-strategy.failure-rate.max-failures-per-interval`: Maximum number of 
restarts in given time interval before failing a job in "failure-rate" strategy.
@@ -484,6 +484,8 @@ May be set to -1 to disable this feature.
 
 - `mesos.resourcemanager.tasks.container.image.name`: Image name to use for 
the container (**NO DEFAULT**)
 
+- `mesos.resourcemanager.tasks.container.volumes`: A comma seperated list of 
[host_path:]container_path[:RO|RW]. This allows for mounting additional volumes 
into your container. (**NO DEFAULT**)
+
 - `high-availability.zookeeper.path.mesos-workers`: The ZooKeeper root path 
for persisting the Mesos worker information.
 
 ### High Availability (HA)

http://git-wip-us.apache.org/repos/asf/flink/blob/4ef9672a/docs/setup/mesos.md
----------------------------------------------------------------------
diff --git a/docs/setup/mesos.md b/docs/setup/mesos.md
index 2ccee65..62168e7 100644
--- a/docs/setup/mesos.md
+++ b/docs/setup/mesos.md
@@ -257,3 +257,5 @@ May be set to -1 to disable this feature.
 `mesos.resourcemanager.tasks.container.type`: Type of the containerization 
used: "mesos" or "docker" (DEFAULT: mesos);
 
 `mesos.resourcemanager.tasks.container.image.name`: Image name to use for the 
container (**NO DEFAULT**)
+
+`mesos.resourcemanager.tasks.container.volumes`: A comma seperated list of 
[host_path:]container_path[:RO|RW]. This allows for mounting additional volumes 
into your container. (**NO DEFAULT**)

http://git-wip-us.apache.org/repos/asf/flink/blob/4ef9672a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index bfe9be8..ff5472e 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -28,7 +28,10 @@ import org.apache.flink.mesos.scheduler.LaunchableTask;
 import org.apache.flink.mesos.util.MesosArtifactResolver;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.util.Preconditions;
 import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.List;
@@ -44,10 +47,11 @@ import static org.apache.flink.mesos.Utils.scalar;
  * Implements the launch of a Mesos worker.
  *
  * Translates the abstract {@link ContainerSpecification} into a concrete
- * Mesos-specific {@link org.apache.mesos.Protos.TaskInfo}.
+ * Mesos-specific {@link Protos.TaskInfo}.
  */
 public class LaunchableMesosWorker implements LaunchableTask {
 
+       protected static final Logger LOG = 
LoggerFactory.getLogger(LaunchableMesosWorker.class);
        /**
         * The set of configuration keys to be dynamically configured with a 
port allocated from Mesos.
         */
@@ -69,12 +73,14 @@ public class LaunchableMesosWorker implements 
LaunchableTask {
         * @param taskID the taskID for this worker.
         */
        public LaunchableMesosWorker(
-               MesosArtifactResolver resolver, MesosTaskManagerParameters 
params,
-               ContainerSpecification containerSpec, Protos.TaskID taskID) {
-               this.resolver = resolver;
-               this.params = params;
-               this.containerSpec = containerSpec;
-               this.taskID = taskID;
+                       MesosArtifactResolver resolver,
+                       MesosTaskManagerParameters params,
+                       ContainerSpecification containerSpec,
+                       Protos.TaskID taskID) {
+               this.resolver = Preconditions.checkNotNull(resolver);
+               this.params = Preconditions.checkNotNull(params);
+               this.containerSpec = Preconditions.checkNotNull(containerSpec);
+               this.taskID = Preconditions.checkNotNull(taskID);
                this.taskRequest = new Request();
        }
 
@@ -232,23 +238,25 @@ public class LaunchableMesosWorker implements 
LaunchableTask {
                cmd.setValue(launchCommand.toString());
 
                // build the container info
-               Protos.ContainerInfo.Builder containerInfo = null;
+               Protos.ContainerInfo.Builder containerInfo = 
Protos.ContainerInfo.newBuilder();
+               // in event that no docker image or mesos image name is 
specified, we must still
+               // set type to MESOS
+               containerInfo.setType(Protos.ContainerInfo.Type.MESOS);
                switch(params.containerType()) {
                        case MESOS:
                                if(params.containerImageName().isDefined()) {
-                                       containerInfo = 
Protos.ContainerInfo.newBuilder()
-                                               
.setType(Protos.ContainerInfo.Type.MESOS)
+                                       containerInfo
                                                
.setMesos(Protos.ContainerInfo.MesosInfo.newBuilder()
-                                               
.setImage(Protos.Image.newBuilder()
-                                                       
.setType(Protos.Image.Type.DOCKER)
-                                                       
.setDocker(Protos.Image.Docker.newBuilder()
-                                                               
.setName(params.containerImageName().get()))));
+                                                       
.setImage(Protos.Image.newBuilder()
+                                                               
.setType(Protos.Image.Type.DOCKER)
+                                                               
.setDocker(Protos.Image.Docker.newBuilder()
+                                                                       
.setName(params.containerImageName().get()))));
                                }
                                break;
 
                        case DOCKER:
                                assert(params.containerImageName().isDefined());
-                               containerInfo = 
Protos.ContainerInfo.newBuilder()
+                                       containerInfo
                                        
.setType(Protos.ContainerInfo.Type.DOCKER)
                                        
.setDocker(Protos.ContainerInfo.DockerInfo.newBuilder()
                                                
.setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST)
@@ -258,9 +266,11 @@ public class LaunchableMesosWorker implements 
LaunchableTask {
                        default:
                                throw new IllegalStateException("unsupported 
container type");
                }
-               if(containerInfo != null) {
-                       taskInfo.setContainer(containerInfo);
-               }
+
+               // add any volumes to the containerInfo
+               containerInfo.addAllVolumes(params.containerVolumes());
+               taskInfo.setContainer(containerInfo);
+
 
                return taskInfo.build();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/4ef9672a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index 044bffe..6064c46 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -23,9 +23,14 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.util.Preconditions;
+import org.apache.mesos.Protos;
 import scala.Option;
 
-import static java.util.Objects.requireNonNull;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
 import static org.apache.flink.configuration.ConfigOptions.key;
 
 /**
@@ -56,6 +61,9 @@ public class MesosTaskManagerParameters {
                key("mesos.resourcemanager.tasks.container.image.name")
                        .noDefaultValue();
 
+       public static final ConfigOption<String> MESOS_RM_CONTAINER_VOLUMES =
+               key("mesos.resourcemanager.tasks.container.volumes")
+                       .noDefaultValue();
        /**
         * Value for {@code MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE} 
setting. Tells to use the Mesos containerizer.
         */
@@ -73,19 +81,24 @@ public class MesosTaskManagerParameters {
 
        private final ContaineredTaskManagerParameters containeredParameters;
 
+       private final List<Protos.Volume> containerVolumes;
+
        public MesosTaskManagerParameters(
-               double cpus,
-               ContainerType containerType,
-               Option<String> containerImageName,
-               ContaineredTaskManagerParameters containeredParameters) {
-               requireNonNull(containeredParameters);
+                       double cpus,
+                       ContainerType containerType,
+                       Option<String> containerImageName,
+                       ContaineredTaskManagerParameters containeredParameters,
+                       List<Protos.Volume> containerVolumes) {
+
                this.cpus = cpus;
-               this.containerType = containerType;
-               this.containerImageName = containerImageName;
-               this.containeredParameters = containeredParameters;
+               this.containerType = Preconditions.checkNotNull(containerType);
+               this.containerImageName = 
Preconditions.checkNotNull(containerImageName);
+               this.containeredParameters = 
Preconditions.checkNotNull(containeredParameters);
+               this.containerVolumes = 
Preconditions.checkNotNull(containerVolumes);
        }
 
-       /**
+
+    /**
         * Get the CPU units to use for the TaskManager process.
      */
        public double cpus() {
@@ -115,6 +128,13 @@ public class MesosTaskManagerParameters {
                return containeredParameters;
        }
 
+       /**
+        * Get the container volumes string
+        */
+       public List<Protos.Volume> containerVolumes() {
+               return containerVolumes;
+       }
+
        @Override
        public String toString() {
                return "MesosTaskManagerParameters{" +
@@ -122,6 +142,7 @@ public class MesosTaskManagerParameters {
                        ", containerType=" + containerType +
                        ", containerImageName=" + containerImageName +
                        ", containeredParameters=" + containeredParameters +
+                       ", containerVolumes=" + containerVolumes +
                        '}';
        }
 
@@ -162,11 +183,69 @@ public class MesosTaskManagerParameters {
                                throw new 
IllegalConfigurationException("invalid container type: " + containerTypeString);
                }
 
+               Option<String> containerVolOpt = 
Option.<String>apply(flinkConfig.getString(MESOS_RM_CONTAINER_VOLUMES));
+
+               List<Protos.Volume> containerVolumes = 
buildVolumes(containerVolOpt);
+
                return new MesosTaskManagerParameters(
                        cpus,
                        containerType,
                        Option.apply(imageName),
-                       containeredParameters);
+                       containeredParameters,
+                       containerVolumes);
+       }
+
+       /**
+        * Used to build volume specs for mesos. This allows for mounting 
additional volumes into a container
+        *
+        * @param containerVolumes a comma delimited optional string of 
[host_path:]container_path[:RO|RW] that
+        *                         defines mount points for a container volume. 
If None or empty string, returns
+        *                         an empty iterator
+        */
+       public static List<Protos.Volume> buildVolumes(Option<String> 
containerVolumes) {
+               if (containerVolumes.isEmpty()) {
+                       return Collections.emptyList();
+               } else {
+                       String[] volumeSpecifications = 
containerVolumes.get().split(",");
+
+                       List<Protos.Volume> volumes = new 
ArrayList<>(volumeSpecifications.length);
+
+                       for (String volumeSpecification : volumeSpecifications) 
{
+                               if (!volumeSpecification.trim().isEmpty()) {
+                                       Protos.Volume.Builder volume = 
Protos.Volume.newBuilder();
+                                       volume.setMode(Protos.Volume.Mode.RW);
+
+                                       String[] parts = 
volumeSpecification.split(":");
+
+                                       switch (parts.length) {
+                                               case 1:
+                                                       
volume.setContainerPath(parts[0]);
+                                                       break;
+                                               case 2:
+                                                       try {
+                                                               
Protos.Volume.Mode mode = 
Protos.Volume.Mode.valueOf(parts[1].trim().toUpperCase());
+                                                               
volume.setMode(mode)
+                                                                       
.setContainerPath(parts[0]);
+                                                       } catch 
(IllegalArgumentException e) {
+                                                               
volume.setHostPath(parts[0])
+                                                                       
.setContainerPath(parts[1]);
+                                                       }
+                                                       break;
+                                               case 3:
+                                                       Protos.Volume.Mode mode 
= Protos.Volume.Mode.valueOf(parts[2].trim().toUpperCase());
+                                                       volume.setMode(mode)
+                                                               
.setHostPath(parts[0])
+                                                               
.setContainerPath(parts[1]);
+                                                       break;
+                                               default:
+                                                       throw new 
IllegalArgumentException("volume specification is invalid, given: " + 
volumeSpecification);
+                                       }
+
+                                       volumes.add(volume.build());
+                               }
+                       }
+                       return volumes;
+               }
        }
 
        public enum ContainerType {

http://git-wip-us.apache.org/repos/asf/flink/blob/4ef9672a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index c854a17..745d35f 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -58,11 +58,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
+import java.util.*;
+
 import static java.util.Collections.singletonList;
-import java.util.HashMap;
 
 import static 
org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractGoalState;
 import static 
org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractResourceID;
@@ -202,7 +200,7 @@ public class MesosFlinkResourceManagerTest extends 
TestLogger {
                        ContaineredTaskManagerParameters containeredParams =
                                new ContaineredTaskManagerParameters(1024, 768, 
256, 4, new HashMap<String, String>());
                        MesosTaskManagerParameters tmParams = new 
MesosTaskManagerParameters(
-                               1.0, 
MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), 
containeredParams);
+                               1.0, 
MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), 
containeredParams, new ArrayList<Protos.Volume>());
 
                        TestActorRef<TestingMesosFlinkResourceManager> 
resourceManagerRef =
                                TestActorRef.create(system, 
MesosFlinkResourceManager.createActorProps(

http://git-wip-us.apache.org/repos/asf/flink/blob/4ef9672a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
new file mode 100644
index 0000000..c4a7bcb
--- /dev/null
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.TestLogger;
+import org.apache.mesos.Protos;
+import org.junit.Test;
+import scala.Option;
+
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class MesosTaskManagerParametersTest extends TestLogger {
+
+       @Test
+       public void testBuildVolumes() throws Exception {
+               List<Protos.Volume> vols;
+               
assertEquals(MesosTaskManagerParameters.buildVolumes(Option.<String>apply(null)).size(),
 0);
+               String spec1 = 
"/host/path:/container/path:RO,/container/path:ro,/host/path:/container/path,/container/path";
+               vols = 
MesosTaskManagerParameters.buildVolumes(Option.<String>apply(spec1));
+               assertEquals(vols.size(), 4);
+               assertEquals("/container/path", vols.get(0).getContainerPath());
+               assertEquals("/host/path", vols.get(0).getHostPath());
+               assertEquals(Protos.Volume.Mode.RO, vols.get(0).getMode());
+               assertEquals("/container/path", vols.get(1).getContainerPath());
+               assertEquals(Protos.Volume.Mode.RO, vols.get(1).getMode());
+               assertEquals("/container/path", vols.get(2).getContainerPath());
+               assertEquals("/host/path", vols.get(2).getHostPath());
+               assertEquals(Protos.Volume.Mode.RW, vols.get(2).getMode());
+               assertEquals("/container/path", vols.get(3).getContainerPath());
+               assertEquals(Protos.Volume.Mode.RW, vols.get(3).getMode());
+
+               // should handle empty strings, but not error
+               assertEquals(0, 
MesosTaskManagerParameters.buildVolumes(Option.<String>apply("")).size());
+       }
+
+       @Test(expected=IllegalArgumentException.class)
+       public void testBuildVolumesBadMode() throws Exception {
+               
MesosTaskManagerParameters.buildVolumes(Option.<String>apply("/hp:/cp:RF"));
+       }
+
+       @Test(expected=IllegalArgumentException.class)
+       public void testBuildVolumesMalformed() throws Exception {
+               
MesosTaskManagerParameters.buildVolumes(Option.<String>apply("/hp:/cp:ro:extra"));
+       }
+
+       @Test
+       public void testContainerVolumes() throws Exception {
+               Configuration config = new Configuration();
+               
config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_VOLUMES, 
"/host/path:/container/path:ro");
+
+               MesosTaskManagerParameters params = 
MesosTaskManagerParameters.create(config);
+               assertEquals(1, params.containerVolumes().size());
+               assertEquals("/container/path", 
params.containerVolumes().get(0).getContainerPath());
+               assertEquals("/host/path", 
params.containerVolumes().get(0).getHostPath());
+               assertEquals(Protos.Volume.Mode.RO, 
params.containerVolumes().get(0).getMode());
+       }
+
+}

Reply via email to