Repository: flink
Updated Branches:
  refs/heads/master 3b85f42dc -> 8d7c3ff08


http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index d844f5d..2a33c44 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -23,6 +23,8 @@ import akka.actor.ActorSystem;
 import akka.actor.Address;
 import com.typesafe.config.Config;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -290,6 +292,40 @@ public class BootstrapTools {
                config.addAll(replacement);
        }
 
+       private static final String DYNAMIC_PROPERTIES_OPT = "D";
+
+       /**
+        * Get an instance of the dynamic properties option.
+        *
+        * Dynamic properties allow the user to specify additional 
configuration values with -D, such as
+        *  -Dfs.overwrite-files=true  
-Dtaskmanager.network.numberOfBuffers=16368
+     */
+       public static Option newDynamicPropertiesOption() {
+               return new Option(DYNAMIC_PROPERTIES_OPT, true, "Dynamic 
properties");
+       }
+
+       /**
+        * Parse the dynamic properties (passed on the command line).
+        */
+       public static Configuration parseDynamicProperties(CommandLine cmd) {
+               final Configuration config = new Configuration();
+
+               String[] values = cmd.getOptionValues(DYNAMIC_PROPERTIES_OPT);
+               if(values != null) {
+                       for(String value : values) {
+                               String[] pair = value.split("=", 2);
+                               if(pair.length == 1) {
+                                       config.setString(pair[0], 
Boolean.TRUE.toString());
+                               }
+                               else if(pair.length == 2) {
+                                       config.setString(pair[0], pair[1]);
+                               }
+                       }
+               }
+
+               return config;
+       }
+
        /**
         * Generates the shell command to start a task manager.
         * @param flinkConfig The Flink configuration.

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContainerSpecification.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContainerSpecification.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContainerSpecification.java
new file mode 100644
index 0000000..508a28c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContainerSpecification.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Encapsulates a container specification, including artifacts, environment 
variables,
+ * system properties, and Flink configuration settings.
+ *
+ * The specification is mutable.
+ *
+ * Note that the Flink configuration settings are considered dynamic overrides 
of whatever
+ * static configuration file is present in the container.  For example, a 
container might be
+ * based on a Docker image with a normal Flink installation with customized 
settings, which these
+ * settings would (partially) override.
+ *
+ * Artifacts are copied into a sandbox directory within the container, which 
any Flink process
+ * launched in the container is assumed to use as a working directory.  This 
assumption allows
+ * for relative paths to be used in certain environment variables.
+ */
+public class ContainerSpecification implements java.io.Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       private final Configuration systemProperties;
+
+       private final List<Artifact> artifacts;
+
+       private final Map<String,String> environmentVariables;
+
+       private final Configuration dynamicConfiguration;
+
+       public ContainerSpecification() {
+               this.artifacts = new LinkedList<>();
+               this.environmentVariables = new HashMap<String,String>();
+               this.systemProperties = new Configuration();
+               this.dynamicConfiguration = new Configuration();
+       }
+
+       /**
+        * Get the container artifacts.
+     */
+       public List<Artifact> getArtifacts() {
+               return artifacts;
+       }
+
+       /**
+        * Get the environment variables.
+     */
+       public Map<String, String> getEnvironmentVariables() {
+               return environmentVariables;
+       }
+
+       /**
+        * Get the dynamic configuration.
+     */
+       public Configuration getDynamicConfiguration() {
+               return dynamicConfiguration;
+       }
+
+       /**
+        * Get the system properties.
+     */
+       public Configuration getSystemProperties() {
+               return systemProperties;
+       }
+
+       @Override
+       protected Object clone() throws CloneNotSupportedException {
+               ContainerSpecification clone = new ContainerSpecification();
+               clone.artifacts.addAll(this.artifacts);
+               clone.environmentVariables.putAll(this.environmentVariables);
+               clone.systemProperties.addAll(this.systemProperties);
+               clone.dynamicConfiguration.addAll(this.dynamicConfiguration);
+               return clone;
+       }
+
+       @Override
+       public String toString() {
+               return "ContainerSpecification{" +
+                       "environmentVariables=" + environmentVariables +
+                       ", systemProperties=" + systemProperties +
+                       ", dynamicConfiguration=" + dynamicConfiguration +
+                       ", artifacts=" + artifacts +
+                       '}';
+       }
+
+       /**
+        * An artifact to be copied into the container.
+        */
+       public static class Artifact {
+
+               public Artifact(Path source, Path dest, boolean executable, 
boolean cachable, boolean extract) {
+                       checkArgument(source.isAbsolute(), "source must be 
absolute");
+                       checkArgument(!dest.isAbsolute(), "destination must be 
relative");
+                       this.source = source;
+                       this.dest = dest;
+                       this.executable = executable;
+                       this.cachable = cachable;
+                       this.extract = extract;
+               }
+
+               public final Path source;
+               public final Path dest;
+               public final boolean executable;
+               public final boolean cachable;
+               public final boolean extract;
+
+               @Override
+               public String toString() {
+                       return "Artifact{" +
+                               "source=" + source +
+                               ", dest=" + dest +
+                               ", executable=" + executable +
+                               ", cachable=" + cachable +
+                               ", extract=" + extract +
+                               '}';
+               }
+
+               public static Builder newBuilder() { return new Builder(); }
+
+               public static class Builder {
+
+                       public Path source;
+                       public Path dest;
+                       public boolean executable = false;
+                       public boolean cachable = true;
+                       public boolean extract = false;
+
+                       public Builder setSource(Path source) {
+                               this.source = source;
+                               return this;
+                       }
+
+                       public Builder setDest(Path dest) {
+                               this.dest = dest;
+                               return this;
+                       }
+
+                       public Builder setCachable(boolean cachable) {
+                               this.cachable = cachable;
+                               return this;
+                       }
+
+                       public Builder setExtract(boolean extract) {
+                               this.extract = extract;
+                               return this;
+                       }
+
+                       public Builder setExecutable(boolean executable) {
+                               this.executable = executable;
+                               return this;
+                       }
+
+                       public Artifact build() {
+                               return new Artifact(source, dest, executable, 
cachable, extract);
+                       }
+               }
+       }
+
+       /**
+        * Format the system properties as a shell-compatible command-line 
argument.
+     */
+       public static String formatSystemProperties(Configuration jvmArgs) {
+               StringBuilder sb = new StringBuilder();
+               for(Map.Entry<String,String> entry : 
jvmArgs.toMap().entrySet()) {
+                       if(sb.length() > 0) {
+                               sb.append(" ");
+                       }
+                       boolean quoted = entry.getValue().contains(" ");
+                       if(quoted) {
+                               sb.append("\"");
+                       }
+                       
sb.append("-D").append(entry.getKey()).append('=').append(entry.getValue());
+                       if(quoted) {
+                               sb.append("\"");
+                       }
+               }
+               return sb.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/AbstractContainerOverlay.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/AbstractContainerOverlay.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/AbstractContainerOverlay.java
new file mode 100644
index 0000000..007146a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/AbstractContainerOverlay.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+
+/**
+ * An abstract container overlay.
+ */
+abstract class AbstractContainerOverlay implements ContainerOverlay {
+
+       /**
+        * Add a path recursively to the container specification.
+        *
+        * If the path is a directory, the directory itself (not just its 
contents) is added to the target path.
+        *
+        * The execute bit is preserved; permissions aren't.
+        *
+        * @param sourcePath the path to add.
+        * @param targetPath the target path.
+        * @param env the specification to mutate.
+     * @throws IOException
+     */
+       protected void addPathRecursively(
+               final File sourcePath, final Path targetPath, final 
ContainerSpecification env) throws IOException {
+
+               final java.nio.file.Path sourceRoot = 
sourcePath.toPath().getParent();
+
+               Files.walkFileTree(sourcePath.toPath(), new 
SimpleFileVisitor<java.nio.file.Path>() {
+                       @Override
+                       public FileVisitResult visitFile(java.nio.file.Path 
file, BasicFileAttributes attrs) throws IOException {
+
+                               java.nio.file.Path relativePath = 
sourceRoot.relativize(file);
+
+                               ContainerSpecification.Artifact.Builder 
artifact = ContainerSpecification.Artifact.newBuilder()
+                                       .setSource(new Path(file.toUri()))
+                                       .setDest(new Path(targetPath, 
relativePath.toString()))
+                                       .setExecutable(Files.isExecutable(file))
+                                       .setCachable(true)
+                                       .setExtract(false);
+
+                               env.getArtifacts().add(artifact.build());
+
+                               return super.visitFile(file, attrs);
+                       }
+               });
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/CompositeContainerOverlay.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/CompositeContainerOverlay.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/CompositeContainerOverlay.java
new file mode 100644
index 0000000..11e8f21
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/CompositeContainerOverlay.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A composite overlay that delegates to a set of inner overlays.
+ */
+public class CompositeContainerOverlay implements ContainerOverlay {
+
+       private final List<ContainerOverlay> overlays;
+
+       public CompositeContainerOverlay(ContainerOverlay... overlays) {
+               this(Arrays.asList(overlays));
+       }
+
+       public CompositeContainerOverlay(List<ContainerOverlay> overlays) {
+               this.overlays = Collections.unmodifiableList(overlays);
+       }
+
+       @Override
+       public void configure(ContainerSpecification containerConfig) throws 
IOException {
+               for(ContainerOverlay overlay : overlays) {
+                       overlay.configure(containerConfig);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlay.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlay.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlay.java
new file mode 100644
index 0000000..62826e2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlay.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+
+import java.io.IOException;
+
+/**
+ * A container overlay to produce a container specification.
+ *
+ * An overlay applies configuration elements, environment variables,
+ * system properties, and artifacts to a container specification.
+ */
+public interface ContainerOverlay {
+
+       /**
+        * Configure the given container specification.
+     */
+       void configure(ContainerSpecification containerSpecification) throws 
IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java
new file mode 100644
index 0000000..a36cc67
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_BIN_DIR;
+import static 
org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR;
+import static 
org.apache.flink.configuration.ConfigConstants.ENV_FLINK_HOME_DIR;
+import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Overlays Flink into a container, based on supplied bin/conf/lib directories.
+ *
+ * The overlayed Flink is indistinguishable from (and interchangeable with)
+ * a normal installation of Flink.  For a docker image-based container, it 
should be
+ * possible to bypass this overlay and rely on the normal installation method.
+ *
+ * The following files are copied to the container:
+ *  - flink/bin/
+ *  - flink/conf/
+ *  - flink/lib/
+ */
+public class FlinkDistributionOverlay extends AbstractContainerOverlay {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDistributionOverlay.class);
+
+       static final Path TARGET_ROOT = new Path("flink");
+
+       final File flinkBinPath;
+       final File flinkConfPath;
+       final File flinkLibPath;
+
+       public FlinkDistributionOverlay(File flinkBinPath, File flinkConfPath, 
File flinkLibPath) {
+               this.flinkBinPath = checkNotNull(flinkBinPath);
+               this.flinkConfPath = checkNotNull(flinkConfPath);
+               this.flinkLibPath = checkNotNull(flinkLibPath);
+       }
+
+       @Override
+       public void configure(ContainerSpecification container) throws 
IOException {
+
+               container.getEnvironmentVariables().put(ENV_FLINK_HOME_DIR, 
TARGET_ROOT.toString());
+
+               // add the paths to the container specification.
+               addPathRecursively(flinkBinPath, TARGET_ROOT, container);
+               addPathRecursively(flinkConfPath, TARGET_ROOT, container);
+               addPathRecursively(flinkLibPath, TARGET_ROOT, container);
+       }
+
+       public static Builder newBuilder() {
+               return new Builder();
+       }
+
+       /**
+        * A builder for the {@link FlinkDistributionOverlay}.
+        */
+       public static class Builder {
+               File flinkBinPath;
+               File flinkConfPath;
+               File flinkLibPath;
+
+               /**
+                * Configures the overlay using the current environment.
+                *
+                * Locates Flink using FLINK_???_DIR environment variables as 
provided to all Flink processes by config.sh.
+                *
+                * @param globalConfiguration the current configuration.
+                */
+               public Builder fromEnvironment(Configuration 
globalConfiguration) {
+
+                       Map<String,String> env = System.getenv();
+                       if(env.containsKey(ENV_FLINK_BIN_DIR)) {
+                               flinkBinPath = new 
File(System.getenv(ENV_FLINK_BIN_DIR));
+                       }
+                       else {
+                               throw new 
IllegalStateException(String.format("the {} environment variable must be set", 
ENV_FLINK_BIN_DIR));
+                       }
+
+                       if(env.containsKey(ENV_FLINK_CONF_DIR)) {
+                               flinkConfPath = new 
File(System.getenv(ENV_FLINK_CONF_DIR));
+                       }
+                       else {
+                               throw new 
IllegalStateException(String.format("the {} environment variable must be set", 
ENV_FLINK_CONF_DIR));
+                       }
+
+                       if(env.containsKey(ENV_FLINK_LIB_DIR)) {
+                               flinkLibPath = new 
File(System.getenv(ENV_FLINK_LIB_DIR));
+                       }
+                       else {
+                               throw new 
IllegalStateException(String.format("the {} environment variable must be set", 
ENV_FLINK_LIB_DIR));
+                       }
+
+                       return this;
+               }
+
+               public FlinkDistributionOverlay build() {
+                       return new FlinkDistributionOverlay(flinkBinPath, 
flinkConfPath, flinkLibPath);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlay.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlay.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlay.java
new file mode 100644
index 0000000..bd79218
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlay.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Overlays a Hadoop configuration into a container, based on a supplied Hadoop
+ * configuration directory.
+ *
+ * The following files are copied to the container:
+ *  - hadoop/conf/core-site.xml
+ *  - hadoop/conf/hdfs-site.xml
+ *
+ * The following environment variables are set in the container:
+ *  - HADOOP_CONF_DIR
+ *
+ * The folloowing Flink configuration entries are updated:
+ *  - fs.hdfs.hadoopconf
+ */
+public class HadoopConfOverlay implements ContainerOverlay {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(HadoopConfOverlay.class);
+
+       /**
+        * The (relative) directory into which the Hadoop conf is copied.
+        */
+       static final Path TARGET_CONF_DIR = new Path("hadoop/conf");
+
+       final File hadoopConfDir;
+
+       public HadoopConfOverlay(@Nullable File hadoopConfDir) {
+               this.hadoopConfDir = hadoopConfDir;
+       }
+
+       @Override
+       public void configure(ContainerSpecification container) throws 
IOException {
+
+               if(hadoopConfDir == null) {
+                       return;
+               }
+
+               File coreSitePath = new File(hadoopConfDir, "core-site.xml");
+               File hdfsSitePath = new File(hadoopConfDir, "hdfs-site.xml");
+
+               container.getEnvironmentVariables().put("HADOOP_CONF_DIR", 
TARGET_CONF_DIR.toString());
+               
container.getDynamicConfiguration().setString(ConfigConstants.PATH_HADOOP_CONFIG,
 TARGET_CONF_DIR.toString());
+
+               container.getArtifacts().add(ContainerSpecification.Artifact
+                       .newBuilder()
+                       .setSource(new Path(coreSitePath.toURI()))
+                       .setDest(new Path(TARGET_CONF_DIR, 
coreSitePath.getName()))
+                       .setCachable(true)
+                       .build());
+
+               container.getArtifacts().add(ContainerSpecification.Artifact
+                       .newBuilder()
+                       .setSource(new Path(hdfsSitePath.toURI()))
+                       .setDest(new Path(TARGET_CONF_DIR, 
hdfsSitePath.getName()))
+                       .setCachable(true)
+                       .build());
+       }
+
+       public static Builder newBuilder() {
+               return new Builder();
+       }
+
+       /**
+        * A builder for the {@link HadoopConfOverlay}.
+        */
+       public static class Builder {
+
+               File hadoopConfDir;
+
+               /**
+                * Configures the overlay using the current environment's 
Hadoop configuration.
+                *
+                * The following locations are checked for a Hadoop 
configuration:
+                *  - (conf) fs.hdfs.hadoopconf
+                *  - (env)  HADOOP_CONF_DIR
+                *  - (env)  HADOOP_HOME/conf
+                *  - (env)  HADOOP_HOME/etc/hadoop
+                *
+                */
+               public Builder fromEnvironment(Configuration 
globalConfiguration) {
+
+                       String[] possibleHadoopConfPaths = new String[4];
+                       possibleHadoopConfPaths[0] = 
globalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
+                       possibleHadoopConfPaths[1] = 
System.getenv("HADOOP_CONF_DIR");
+
+                       if (System.getenv("HADOOP_HOME") != null) {
+                               possibleHadoopConfPaths[2] = 
System.getenv("HADOOP_HOME")+"/conf";
+                               possibleHadoopConfPaths[3] = 
System.getenv("HADOOP_HOME")+"/etc/hadoop"; // hadoop 2.2
+                       }
+
+                       for (String possibleHadoopConfPath : 
possibleHadoopConfPaths) {
+                               if (possibleHadoopConfPath != null) {
+                                       File confPath = new 
File(possibleHadoopConfPath);
+
+                                       File coreSitePath = new File(confPath, 
"core-site.xml");
+                                       File hdfsSitePath = new File(confPath, 
"hdfs-site.xml");
+
+                                       if (coreSitePath.exists() && 
hdfsSitePath.exists()) {
+                                               this.hadoopConfDir = confPath;
+                                               break;
+                                       }
+                               }
+                       }
+
+                       if(hadoopConfDir == null) {
+                               LOG.warn("Unable to locate a Hadoop 
configuration; HDFS will use defaults.");
+                       }
+
+                       return this;
+               }
+
+               public HadoopConfOverlay build() {
+                       return new HadoopConfOverlay(hadoopConfDir);
+               }
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlay.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlay.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlay.java
new file mode 100644
index 0000000..7081aea
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlay.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+
+/**
+ * Overlays a Hadoop user context into a container.
+ *
+ * The overlay essentially configures Hadoop's {@link UserGroupInformation} 
class,
+ * establishing the effective username for filesystem calls to HDFS in 
non-secure clusters.
+ *
+ * In secure clusters, the configured keytab establishes the effective user.
+ *
+ * The following environment variables are set in the container:
+ *  - HADOOP_USER_NAME
+ */
+public class HadoopUserOverlay implements ContainerOverlay {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(HadoopUserOverlay.class);
+
+       private final UserGroupInformation ugi;
+
+       public HadoopUserOverlay(@Nullable UserGroupInformation ugi) {
+               this.ugi = ugi;
+       }
+
+       @Override
+       public void configure(ContainerSpecification container) throws 
IOException {
+               if(ugi != null) {
+                       // overlay the Hadoop user identity (w/ tokens)
+                       
container.getEnvironmentVariables().put("HADOOP_USER_NAME", ugi.getUserName());
+               }
+       }
+
+       public static Builder newBuilder() {
+               return new Builder();
+       }
+
+       /**
+        * A builder for the {@link HadoopUserOverlay}.
+        */
+       public static class Builder {
+
+               UserGroupInformation ugi;
+
+               /**
+                * Configures the overlay using the current Hadoop user 
information (from {@link UserGroupInformation}).
+         */
+               public Builder fromEnvironment(Configuration 
globalConfiguration) throws IOException {
+                       ugi = UserGroupInformation.getCurrentUser();
+                       return this;
+               }
+
+               public HadoopUserOverlay build() {
+                       return new HadoopUserOverlay(ugi);
+               }
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
new file mode 100644
index 0000000..7fe5b3e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+
+
+/**
+ * Overlays cluster-level Kerberos credentials (i.e. keytab) into a container.
+ *
+ * The folloowing Flink configuration entries are updated:
+ *  - security.keytab
+ */
+public class KeytabOverlay extends AbstractContainerOverlay {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(KeytabOverlay.class);
+
+       static final Path TARGET_PATH = new Path("krb5.keytab");
+
+       final Path keytab;
+
+       public KeytabOverlay(@Nullable File keytab) {
+               this.keytab = keytab != null ? new Path(keytab.toURI()) : null;
+       }
+
+       public KeytabOverlay(@Nullable Path keytab) {
+               this.keytab = keytab;
+       }
+
+       @Override
+       public void configure(ContainerSpecification container) throws 
IOException {
+               if(keytab != null) {
+                       
container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder()
+                               .setSource(keytab)
+                               .setDest(TARGET_PATH)
+                               .setCachable(false)
+                               .build());
+                       
container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_KEYTAB_KEY,
 TARGET_PATH.getPath());
+               }
+       }
+
+       public static Builder newBuilder() {
+               return new Builder();
+       }
+
+       /**
+        * A builder for the {@link HadoopUserOverlay}.
+        */
+       public static class Builder {
+
+               File keytabPath;
+
+               /**
+                * Configures the overlay using the current environment (and 
global configuration).
+                *
+                * The following Flink configuration settings are checked for a 
keytab:
+                *  - security.keytab
+                */
+               public Builder fromEnvironment(Configuration 
globalConfiguration) {
+                       String keytab = 
globalConfiguration.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
+                       if(keytab != null) {
+                               keytabPath = new File(keytab);
+                               if(!keytabPath.exists()) {
+                                       throw new 
IllegalStateException("Invalid configuration for " +
+                                               
ConfigConstants.SECURITY_KEYTAB_KEY +
+                                               "; '" + keytab + "' not 
found.");
+                               }
+                       }
+
+                       return this;
+               }
+
+               public KeytabOverlay build() {
+                       return new KeytabOverlay(keytabPath);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlay.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlay.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlay.java
new file mode 100644
index 0000000..fb161b9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlay.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+
+
+/**
+ * Overlays a Kerberos configuration file into a container.
+ *
+ * The following files are copied to the container:
+ *  - krb5.conf
+ *
+ * The following Java system properties are set in the container:
+ *  - java.security.krb5.conf
+ */
+public class Krb5ConfOverlay extends AbstractContainerOverlay {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(Krb5ConfOverlay.class);
+
+       static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
+
+       static final Path TARGET_PATH = new Path("krb5.conf");
+       final Path krb5Conf;
+
+       public Krb5ConfOverlay(@Nullable File krb5Conf) {
+               this.krb5Conf = krb5Conf != null ? new Path(krb5Conf.toURI()) : 
null;
+       }
+
+       public Krb5ConfOverlay(@Nullable Path krb5Conf) {
+               this.krb5Conf = krb5Conf;
+       }
+
+       @Override
+       public void configure(ContainerSpecification container) throws 
IOException {
+               if(krb5Conf != null) {
+                       
container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder()
+                               .setSource(krb5Conf)
+                               .setDest(TARGET_PATH)
+                               .setCachable(true)
+                               .build());
+                       
container.getSystemProperties().setString(JAVA_SECURITY_KRB5_CONF, 
TARGET_PATH.getPath());
+               }
+       }
+
+       public static Builder newBuilder() {
+               return new Builder();
+       }
+
+       /**
+        * A builder for the {@link Krb5ConfOverlay}.
+        */
+       public static class Builder {
+
+               File krb5ConfPath;
+
+               /**
+                * Configures the overlay using the current environment.
+                *
+                * Locates the krb5.conf configuration file as per
+                * <a 
href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html";>Java
 documentation</a>.
+                * Note that the JRE doesn't support the KRB5_CONFIG 
environment variable (JDK-7045913).
+                */
+               public Builder fromEnvironment(Configuration 
globalConfiguration) {
+
+                       // check the system property
+                       String krb5Config = 
System.getProperty(JAVA_SECURITY_KRB5_CONF);
+                       if(krb5Config != null && krb5Config.length() != 0) {
+                               krb5ConfPath = new File(krb5Config);
+                               if(!krb5ConfPath.exists()) {
+                                       throw new 
IllegalStateException("java.security.krb5.conf refers to a non-existent file");
+                               }
+                       }
+
+                       // FUTURE: check the well-known paths
+                       // - $JAVA_HOME/lib/security
+                       // - %WINDIR%\krb5.ini (Windows)
+                       // - /etc/krb5.conf (Linux)
+
+                       return this;
+               }
+
+               public Krb5ConfOverlay build() {
+                       return new Krb5ConfOverlay(krb5ConfPath);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java
new file mode 100644
index 0000000..dd79ca1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+
+
+/**
+ * Overlays an SSL keystore/truststore into a container.
+ *
+ * The following files are placed into the container:
+ *  - keystore.jks
+ *  - truststore.jks
+ *
+ * The following Flink configuration entries are set:
+ *  - security.ssl.keystore
+ *  - security.ssl.truststore
+ */
+public class SSLStoreOverlay extends AbstractContainerOverlay {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(SSLStoreOverlay.class);
+
+       static final Path TARGET_KEYSTORE_PATH = new Path("keystore.jks");
+       static final Path TARGET_TRUSTSTORE_PATH = new Path("truststore.jks");
+
+       final Path keystore;
+       final Path truststore;
+
+       public SSLStoreOverlay(@Nullable File keystoreFile, @Nullable File 
truststoreFile) {
+               this.keystore = keystoreFile != null ? new 
Path(keystoreFile.toURI()) : null;
+               this.truststore = truststoreFile != null ? new 
Path(truststoreFile.toURI()) : null;
+       }
+
+       @Override
+       public void configure(ContainerSpecification container) throws 
IOException {
+               if(keystore != null) {
+                       
container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder()
+                               .setSource(keystore)
+                               .setDest(TARGET_KEYSTORE_PATH)
+                               .setCachable(false)
+                               .build());
+                       
container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_SSL_KEYSTORE,
 TARGET_KEYSTORE_PATH.getPath());
+               }
+               if(truststore != null) {
+                       
container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder()
+                               .setSource(truststore)
+                               .setDest(TARGET_TRUSTSTORE_PATH)
+                               .setCachable(false)
+                               .build());
+                       
container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE,
 TARGET_TRUSTSTORE_PATH.getPath());
+               }
+       }
+
+       public static Builder newBuilder() {
+               return new Builder();
+       }
+
+       /**
+        * A builder for the {@link Krb5ConfOverlay}.
+        */
+       public static class Builder {
+
+               File keystorePath;
+
+               File truststorePath;
+
+               /**
+                * Configures the overlay using the current environment (and 
global configuration).
+                *
+                * The following Flink configuration settings are used to 
source the keystore and truststore:
+                *  - security.ssl.keystore
+                *  - security.ssl.truststore
+                */
+               public Builder fromEnvironment(Configuration 
globalConfiguration)  {
+
+                       String keystore = 
globalConfiguration.getString(ConfigConstants.SECURITY_SSL_KEYSTORE, null);
+                       if(keystore != null) {
+                               keystorePath = new File(keystore);
+                               if(!keystorePath.exists()) {
+                                       throw new 
IllegalStateException("Invalid configuration for " + 
ConfigConstants.SECURITY_SSL_KEYSTORE);
+                               }
+                       }
+
+                       String truststore = 
globalConfiguration.getString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, null);
+                       if(truststore != null) {
+                               truststorePath = new File(truststore);
+                               if(!truststorePath.exists()) {
+                                       throw new 
IllegalStateException("Invalid configuration for " + 
ConfigConstants.SECURITY_SSL_TRUSTSTORE);
+                               }
+                       }
+
+                       return this;
+               }
+
+               public SSLStoreOverlay build() {
+                       return new SSLStoreOverlay(keystorePath, 
truststorePath);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index f6e0a8c..7416cc6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -53,7 +53,7 @@ public class SecurityUtils {
 
        public static final String JAAS_CONF_FILENAME = "flink-jaas.conf";
 
-       private static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = 
"java.security.auth.login.config";
+       public static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = 
"java.security.auth.login.config";
 
        private static final String ZOOKEEPER_SASL_CLIENT = 
"zookeeper.sasl.client";
 
@@ -130,6 +130,8 @@ public class SecurityUtils {
                                loginUser = UserGroupInformation.getLoginUser();
                        }
 
+                       LOG.info("Hadoop user set to {}", loginUser.toString());
+
                        boolean delegationToken = false;
                        final Text HDFS_DELEGATION_KIND = new 
Text("HDFS_DELEGATION_TOKEN");
                        Collection<Token<? extends TokenIdentifier>> usrTok = 
loginUser.getTokens();

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlayTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlayTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlayTestBase.java
new file mode 100644
index 0000000..bbea376
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlayTestBase.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ContainerOverlayTestBase {
+
+       private Map<String, String> originalEnvironment;
+
+       @Before
+       public void before() {
+               originalEnvironment = new HashMap<>(System.getenv());
+       }
+
+       @After
+       public void after() {
+               CommonTestUtils.setEnv(originalEnvironment, true);
+       }
+
+
+       /**
+        * Create an empty file for each given path.
+        * @param root the root folder in which to create the files.
+        * @param paths the relative paths to create.
+     */
+       protected static Path[] createPaths(File root, String... paths) throws 
Exception {
+               Path[] files = new Path[paths.length];
+               for(int i = 0; i < paths.length; i++) {
+                       File file = root.toPath().resolve(paths[i]).toFile();
+                       file.getParentFile().mkdirs();
+                       file.createNewFile();
+                       files[i] = new Path(paths[i]);
+               }
+               return files;
+       }
+
+       /**
+        * Check that an artifact exists for the given remote path.
+     */
+       protected static ContainerSpecification.Artifact 
checkArtifact(ContainerSpecification spec, Path remotePath) {
+               for(ContainerSpecification.Artifact artifact : 
spec.getArtifacts()) {
+                       if(remotePath.equals(artifact.dest)) {
+                               return artifact;
+                       }
+               }
+               throw new AssertionError("no such artifact (" + remotePath + 
")");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java
new file mode 100644
index 0000000..e77dd3a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_BIN_DIR;
+import static 
org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR;
+import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR;
+
+import static 
org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay.TARGET_ROOT;
+
+public class FlinkDistributionOverlayTest extends ContainerOverlayTestBase {
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Test
+       public void testConfigure() throws Exception {
+
+               File binFolder = tempFolder.newFolder("bin");
+               File libFolder = tempFolder.newFolder("lib");
+               File confFolder = tempFolder.newFolder("conf");
+
+               Path[] files = createPaths(
+                       tempFolder.getRoot(),
+                       "bin/config.sh",
+                       "bin/taskmanager.sh",
+                       "lib/foo.jar",
+                       "lib/A/foo.jar",
+                       "lib/B/foo.jar",
+                       "lib/B/bar.jar");
+
+               ContainerSpecification containerSpecification = new 
ContainerSpecification();
+               FlinkDistributionOverlay overlay = new FlinkDistributionOverlay(
+                       binFolder,
+                       confFolder,
+                       libFolder
+               );
+               overlay.configure(containerSpecification);
+
+               for(Path file : files) {
+                       checkArtifact(containerSpecification, new 
Path(TARGET_ROOT, file.toString()));
+               }
+       }
+
+       @Test
+       public void testBuilderFromEnvironment() throws Exception {
+               Configuration conf = new Configuration();
+
+               File binFolder = tempFolder.newFolder("bin");
+               File libFolder = tempFolder.newFolder("lib");
+               File confFolder = tempFolder.newFolder("conf");
+
+               // adjust the test environment for the purposes of this test
+               Map<String, String> map = new HashMap<String, 
String>(System.getenv());
+               map.put(ENV_FLINK_BIN_DIR, binFolder.getAbsolutePath());
+               map.put(ENV_FLINK_LIB_DIR, libFolder.getAbsolutePath());
+               map.put(ENV_FLINK_CONF_DIR, confFolder.getAbsolutePath());
+               CommonTestUtils.setEnv(map);
+
+               FlinkDistributionOverlay.Builder builder = 
FlinkDistributionOverlay.newBuilder().fromEnvironment(conf);
+
+               assertEquals(binFolder.getAbsolutePath(), 
builder.flinkBinPath.getAbsolutePath());
+               assertEquals(libFolder.getAbsolutePath(), 
builder.flinkLibPath.getAbsolutePath());
+               assertEquals(confFolder.getAbsolutePath(), 
builder.flinkConfPath.getAbsolutePath());
+       }
+
+       @Test
+       public void testBuilderFromEnvironmentBad() throws Exception {
+               Configuration conf = new Configuration();
+
+               // adjust the test environment for the purposes of this test
+               Map<String, String> map = new HashMap<>(System.getenv());
+               map.remove(ENV_FLINK_BIN_DIR);
+               map.remove(ENV_FLINK_LIB_DIR);
+               map.remove(ENV_FLINK_CONF_DIR);
+               CommonTestUtils.setEnv(map);
+
+               try {
+                       FlinkDistributionOverlay.Builder builder = 
FlinkDistributionOverlay.newBuilder().fromEnvironment(conf);
+                       fail();
+               }
+               catch(IllegalStateException e) {
+                       // expected
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlayTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlayTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlayTest.java
new file mode 100644
index 0000000..c3ea41b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlayTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+import static 
org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay.TARGET_CONF_DIR;
+
+public class HadoopConfOverlayTest extends ContainerOverlayTestBase {
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Test
+       public void testConfigure() throws Exception {
+
+               File confDir = tempFolder.newFolder();
+               initConfDir(confDir);
+
+               HadoopConfOverlay overlay = new HadoopConfOverlay(confDir);
+
+               ContainerSpecification spec = new ContainerSpecification();
+               overlay.configure(spec);
+
+               assertEquals(TARGET_CONF_DIR.getPath(), 
spec.getEnvironmentVariables().get("HADOOP_CONF_DIR"));
+               assertEquals(TARGET_CONF_DIR.getPath(), 
spec.getDynamicConfiguration().getString(ConfigConstants.PATH_HADOOP_CONFIG, 
null));
+
+               checkArtifact(spec, new Path(TARGET_CONF_DIR, "core-site.xml"));
+               checkArtifact(spec, new Path(TARGET_CONF_DIR, "hdfs-site.xml"));
+       }
+
+       @Test
+       public void testNoConf() throws Exception {
+               HadoopConfOverlay overlay = new HadoopConfOverlay(null);
+
+               ContainerSpecification containerSpecification = new 
ContainerSpecification();
+               overlay.configure(containerSpecification);
+       }
+
+       @Test
+       public void testBuilderFromEnvironment() throws Exception {
+
+               // verify that the builder picks up various environment 
locations
+               HadoopConfOverlay.Builder builder;
+               Map<String, String> env;
+
+               // fs.hdfs.hadoopconf
+               File confDir = tempFolder.newFolder();
+               initConfDir(confDir);
+               Configuration conf = new Configuration();
+               conf.setString(ConfigConstants.PATH_HADOOP_CONFIG, 
confDir.getAbsolutePath());
+               builder = HadoopConfOverlay.newBuilder().fromEnvironment(conf);
+               assertEquals(confDir, builder.hadoopConfDir);
+
+               // HADOOP_CONF_DIR
+               env = new HashMap<String, String>(System.getenv());
+               env.remove("HADOOP_HOME");
+               env.put("HADOOP_CONF_DIR", confDir.getAbsolutePath());
+               CommonTestUtils.setEnv(env);
+               builder = HadoopConfOverlay.newBuilder().fromEnvironment(new 
Configuration());
+               assertEquals(confDir, builder.hadoopConfDir);
+
+               // HADOOP_HOME/conf
+               File homeDir = tempFolder.newFolder();
+               confDir = initConfDir(new File(homeDir, "conf"));
+               env = new HashMap<String, String>(System.getenv());
+               env.remove("HADOOP_CONF_DIR");
+               env.put("HADOOP_HOME", homeDir.getAbsolutePath());
+               CommonTestUtils.setEnv(env);
+               builder = HadoopConfOverlay.newBuilder().fromEnvironment(new 
Configuration());
+               assertEquals(confDir, builder.hadoopConfDir);
+
+               // HADOOP_HOME/etc/hadoop
+               homeDir = tempFolder.newFolder();
+               confDir = initConfDir(new File(homeDir, "etc/hadoop"));
+               env = new HashMap<String, String>(System.getenv());
+               env.remove("HADOOP_CONF_DIR");
+               env.put("HADOOP_HOME", homeDir.getAbsolutePath());
+               CommonTestUtils.setEnv(env);
+               builder = HadoopConfOverlay.newBuilder().fromEnvironment(new 
Configuration());
+               assertEquals(confDir, builder.hadoopConfDir);
+       }
+
+       private File initConfDir(File confDir) throws Exception {
+               confDir.mkdirs();
+               new File(confDir, "core-site.xml").createNewFile();
+               new File(confDir, "hdfs-site.xml").createNewFile();
+               return confDir;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlayTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlayTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlayTest.java
new file mode 100644
index 0000000..7a463b8
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlayTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+
+import java.security.PrivilegedAction;
+
+import static org.junit.Assert.assertEquals;
+
+public class HadoopUserOverlayTest extends ContainerOverlayTestBase {
+
+       @Test
+       public void testConfigure() throws Exception {
+
+               final UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser("test");
+
+               HadoopUserOverlay overlay = new HadoopUserOverlay(ugi);
+
+               ContainerSpecification spec = new ContainerSpecification();
+               overlay.configure(spec);
+
+               assertEquals(ugi.getUserName(), 
spec.getEnvironmentVariables().get("HADOOP_USER_NAME"));
+       }
+
+       @Test
+       public void testNoConf() throws Exception {
+               HadoopUserOverlay overlay = new HadoopUserOverlay(null);
+
+               ContainerSpecification containerSpecification = new 
ContainerSpecification();
+               overlay.configure(containerSpecification);
+       }
+
+       @Test
+       public void testBuilderFromEnvironment() throws Exception {
+
+               final Configuration conf = new Configuration();
+               final UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser("test");
+
+               ugi.doAs(new PrivilegedAction<Object>() {
+                       @Override
+                       public Object run() {
+                               try {
+                                       HadoopUserOverlay.Builder builder = 
HadoopUserOverlay.newBuilder().fromEnvironment(conf);
+                                       assertEquals(ugi, builder.ugi);
+                                       return null;
+                               }
+                               catch(Exception ex) {
+                                       throw new AssertionError(ex);
+                               }
+                       }
+               });
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
new file mode 100644
index 0000000..0570f28
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static 
org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay.TARGET_PATH;
+import static org.junit.Assert.assertEquals;
+
+public class KeytabOverlayTest extends ContainerOverlayTestBase {
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Test
+       public void testConfigure() throws Exception {
+
+               File keytab = tempFolder.newFile();
+
+               KeytabOverlay overlay = new KeytabOverlay(keytab);
+
+               ContainerSpecification spec = new ContainerSpecification();
+               overlay.configure(spec);
+
+               assertEquals(TARGET_PATH.getPath(), 
spec.getDynamicConfiguration().getString(ConfigConstants.SECURITY_KEYTAB_KEY, 
null));
+               checkArtifact(spec, TARGET_PATH);
+       }
+
+       @Test
+       public void testNoConf() throws Exception {
+               KeytabOverlay overlay = new KeytabOverlay((Path) null);
+
+               ContainerSpecification containerSpecification = new 
ContainerSpecification();
+               overlay.configure(containerSpecification);
+       }
+
+       @Test
+       public void testBuilderFromEnvironment() throws Exception {
+
+               final Configuration conf = new Configuration();
+               File keytab = tempFolder.newFile();
+
+               conf.setString(ConfigConstants.SECURITY_KEYTAB_KEY, 
keytab.getAbsolutePath());
+               KeytabOverlay.Builder builder = 
KeytabOverlay.newBuilder().fromEnvironment(conf);
+               assertEquals(builder.keytabPath, keytab);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlayTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlayTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlayTest.java
new file mode 100644
index 0000000..1f86b89
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlayTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static 
org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay.JAVA_SECURITY_KRB5_CONF;
+import static 
org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay.TARGET_PATH;
+import static org.junit.Assert.assertEquals;
+
+public class Krb5ConfOverlayTest extends ContainerOverlayTestBase {
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Test
+       public void testConfigure() throws Exception {
+
+               File krb5conf = tempFolder.newFile();
+
+               Krb5ConfOverlay overlay = new Krb5ConfOverlay(krb5conf);
+
+               ContainerSpecification spec = new ContainerSpecification();
+               overlay.configure(spec);
+
+               assertEquals(TARGET_PATH.getPath(), 
spec.getSystemProperties().getString(JAVA_SECURITY_KRB5_CONF, null));
+               checkArtifact(spec, TARGET_PATH);
+       }
+
+       @Test
+       public void testNoConf() throws Exception {
+               Krb5ConfOverlay overlay = new Krb5ConfOverlay((Path) null);
+
+               ContainerSpecification containerSpecification = new 
ContainerSpecification();
+               overlay.configure(containerSpecification);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java
new file mode 100644
index 0000000..0894ce6
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.overlays;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static 
org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay.TARGET_KEYSTORE_PATH;
+import static 
org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay.TARGET_TRUSTSTORE_PATH;
+import static org.junit.Assert.assertEquals;
+
+public class SSLStoreOverlayTest extends ContainerOverlayTestBase {
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Test
+       public void testConfigure() throws Exception {
+
+               File keystore = tempFolder.newFile();
+               File truststore = tempFolder.newFile();
+               SSLStoreOverlay overlay = new SSLStoreOverlay(keystore, 
truststore);
+
+               ContainerSpecification spec = new ContainerSpecification();
+               overlay.configure(spec);
+
+               assertEquals(TARGET_KEYSTORE_PATH.getPath(), 
spec.getDynamicConfiguration().getString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
null));
+               checkArtifact(spec, TARGET_KEYSTORE_PATH);
+
+               assertEquals(TARGET_TRUSTSTORE_PATH.getPath(), 
spec.getDynamicConfiguration().getString(ConfigConstants.SECURITY_SSL_TRUSTSTORE,
 null));
+               checkArtifact(spec, TARGET_TRUSTSTORE_PATH);
+       }
+
+       @Test
+       public void testNoConf() throws Exception {
+               SSLStoreOverlay overlay = new SSLStoreOverlay(null, null);
+
+               ContainerSpecification containerSpecification = new 
ContainerSpecification();
+               overlay.configure(containerSpecification);
+       }
+
+       @Test
+       public void testBuilderFromEnvironment() throws Exception {
+
+               final Configuration conf = new Configuration();
+               File keystore = tempFolder.newFile();
+               File truststore = tempFolder.newFile();
+
+               conf.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
keystore.getAbsolutePath());
+               conf.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, 
truststore.getAbsolutePath());
+
+               SSLStoreOverlay.Builder builder = 
SSLStoreOverlay.newBuilder().fromEnvironment(conf);
+               assertEquals(builder.keystorePath, keystore);
+               assertEquals(builder.truststorePath, truststore);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
index d318a3c..45c5a77 100644
--- 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
+++ 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
@@ -29,6 +29,9 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.Map;
 
 import static org.junit.Assert.fail;
 
@@ -114,4 +117,40 @@ public class CommonTestUtils {
                        fail("Cannot determine Java version: " + 
e.getMessage());
                }
        }
+
+       // This code is taken from: http://stackoverflow.com/a/7201825/568695
+       // it changes the environment variables of this JVM. Use only for 
testing purposes!
+       @SuppressWarnings("unchecked")
+       public static void setEnv(Map<String, String> newenv) {
+               try {
+                       Class<?> processEnvironmentClass = 
Class.forName("java.lang.ProcessEnvironment");
+                       Field theEnvironmentField = 
processEnvironmentClass.getDeclaredField("theEnvironment");
+                       theEnvironmentField.setAccessible(true);
+                       Map<String, String> env = (Map<String, String>) 
theEnvironmentField.get(null);
+                       env.putAll(newenv);
+                       Field theCaseInsensitiveEnvironmentField = 
processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
+                       theCaseInsensitiveEnvironmentField.setAccessible(true);
+                       Map<String, String> cienv = (Map<String, String>) 
theCaseInsensitiveEnvironmentField.get(null);
+                       cienv.putAll(newenv);
+               } catch (NoSuchFieldException e) {
+                       try {
+                               Class<?>[] classes = 
Collections.class.getDeclaredClasses();
+                               Map<String, String> env = System.getenv();
+                               for (Class<?> cl : classes) {
+                                       if 
("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
+                                               Field field = 
cl.getDeclaredField("m");
+                                               field.setAccessible(true);
+                                               Object obj = field.get(env);
+                                               Map<String, String> map = 
(Map<String, String>) obj;
+                                               map.clear();
+                                               map.putAll(newenv);
+                                       }
+                               }
+                       } catch (Exception e2) {
+                               throw new RuntimeException(e2);
+                       }
+               } catch (Exception e1) {
+                       throw new RuntimeException(e1);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index aa5e7d3..804b3d4 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
@@ -58,7 +59,6 @@ import java.io.FileReader;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
-import java.lang.reflect.Field;
 import java.net.HttpURLConnection;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -542,42 +542,10 @@ public class TestBaseUtils extends TestLogger {
                return configs;
        }
 
-       // This code is taken from: http://stackoverflow.com/a/7201825/568695
-       // it changes the environment variables of this JVM. Use only for 
testing purposes!
-       @SuppressWarnings("unchecked")
        public static void setEnv(Map<String, String> newenv) {
-               try {
-                       Class<?> processEnvironmentClass = 
Class.forName("java.lang.ProcessEnvironment");
-                       Field theEnvironmentField = 
processEnvironmentClass.getDeclaredField("theEnvironment");
-                       theEnvironmentField.setAccessible(true);
-                       Map<String, String> env = (Map<String, String>) 
theEnvironmentField.get(null);
-                       env.putAll(newenv);
-                       Field theCaseInsensitiveEnvironmentField = 
processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
-                       theCaseInsensitiveEnvironmentField.setAccessible(true);
-                       Map<String, String> cienv = (Map<String, String>) 
theCaseInsensitiveEnvironmentField.get(null);
-                       cienv.putAll(newenv);
-               } catch (NoSuchFieldException e) {
-                       try {
-                               Class<?>[] classes = 
Collections.class.getDeclaredClasses();
-                               Map<String, String> env = System.getenv();
-                               for (Class<?> cl : classes) {
-                                       if 
("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
-                                               Field field = 
cl.getDeclaredField("m");
-                                               field.setAccessible(true);
-                                               Object obj = field.get(env);
-                                               Map<String, String> map = 
(Map<String, String>) obj;
-                                               map.clear();
-                                               map.putAll(newenv);
-                                       }
-                               }
-                       } catch (Exception e2) {
-                               throw new RuntimeException(e2);
-                       }
-               } catch (Exception e1) {
-                       throw new RuntimeException(e1);
-               }
+               CommonTestUtils.setEnv(newenv);
        }
-       
+
        private static ExecutionContext defaultExecutionContext() {
                return ExecutionContext$.MODULE$.global();
        }

Reply via email to