Repository: flink Updated Branches: refs/heads/master 3b85f42dc -> 8d7c3ff08
http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index d844f5d..2a33c44 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -23,6 +23,8 @@ import akka.actor.ActorSystem; import akka.actor.Address; import com.typesafe.config.Config; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -290,6 +292,40 @@ public class BootstrapTools { config.addAll(replacement); } + private static final String DYNAMIC_PROPERTIES_OPT = "D"; + + /** + * Get an instance of the dynamic properties option. + * + * Dynamic properties allow the user to specify additional configuration values with -D, such as + * -Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368 + */ + public static Option newDynamicPropertiesOption() { + return new Option(DYNAMIC_PROPERTIES_OPT, true, "Dynamic properties"); + } + + /** + * Parse the dynamic properties (passed on the command line). + */ + public static Configuration parseDynamicProperties(CommandLine cmd) { + final Configuration config = new Configuration(); + + String[] values = cmd.getOptionValues(DYNAMIC_PROPERTIES_OPT); + if(values != null) { + for(String value : values) { + String[] pair = value.split("=", 2); + if(pair.length == 1) { + config.setString(pair[0], Boolean.TRUE.toString()); + } + else if(pair.length == 2) { + config.setString(pair[0], pair[1]); + } + } + } + + return config; + } + /** * Generates the shell command to start a task manager. * @param flinkConfig The Flink configuration. http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContainerSpecification.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContainerSpecification.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContainerSpecification.java new file mode 100644 index 0000000..508a28c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContainerSpecification.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Encapsulates a container specification, including artifacts, environment variables, + * system properties, and Flink configuration settings. + * + * The specification is mutable. + * + * Note that the Flink configuration settings are considered dynamic overrides of whatever + * static configuration file is present in the container. For example, a container might be + * based on a Docker image with a normal Flink installation with customized settings, which these + * settings would (partially) override. + * + * Artifacts are copied into a sandbox directory within the container, which any Flink process + * launched in the container is assumed to use as a working directory. This assumption allows + * for relative paths to be used in certain environment variables. + */ +public class ContainerSpecification implements java.io.Serializable { + + private static final long serialVersionUID = 1L; + + private final Configuration systemProperties; + + private final List<Artifact> artifacts; + + private final Map<String,String> environmentVariables; + + private final Configuration dynamicConfiguration; + + public ContainerSpecification() { + this.artifacts = new LinkedList<>(); + this.environmentVariables = new HashMap<String,String>(); + this.systemProperties = new Configuration(); + this.dynamicConfiguration = new Configuration(); + } + + /** + * Get the container artifacts. + */ + public List<Artifact> getArtifacts() { + return artifacts; + } + + /** + * Get the environment variables. + */ + public Map<String, String> getEnvironmentVariables() { + return environmentVariables; + } + + /** + * Get the dynamic configuration. + */ + public Configuration getDynamicConfiguration() { + return dynamicConfiguration; + } + + /** + * Get the system properties. + */ + public Configuration getSystemProperties() { + return systemProperties; + } + + @Override + protected Object clone() throws CloneNotSupportedException { + ContainerSpecification clone = new ContainerSpecification(); + clone.artifacts.addAll(this.artifacts); + clone.environmentVariables.putAll(this.environmentVariables); + clone.systemProperties.addAll(this.systemProperties); + clone.dynamicConfiguration.addAll(this.dynamicConfiguration); + return clone; + } + + @Override + public String toString() { + return "ContainerSpecification{" + + "environmentVariables=" + environmentVariables + + ", systemProperties=" + systemProperties + + ", dynamicConfiguration=" + dynamicConfiguration + + ", artifacts=" + artifacts + + '}'; + } + + /** + * An artifact to be copied into the container. + */ + public static class Artifact { + + public Artifact(Path source, Path dest, boolean executable, boolean cachable, boolean extract) { + checkArgument(source.isAbsolute(), "source must be absolute"); + checkArgument(!dest.isAbsolute(), "destination must be relative"); + this.source = source; + this.dest = dest; + this.executable = executable; + this.cachable = cachable; + this.extract = extract; + } + + public final Path source; + public final Path dest; + public final boolean executable; + public final boolean cachable; + public final boolean extract; + + @Override + public String toString() { + return "Artifact{" + + "source=" + source + + ", dest=" + dest + + ", executable=" + executable + + ", cachable=" + cachable + + ", extract=" + extract + + '}'; + } + + public static Builder newBuilder() { return new Builder(); } + + public static class Builder { + + public Path source; + public Path dest; + public boolean executable = false; + public boolean cachable = true; + public boolean extract = false; + + public Builder setSource(Path source) { + this.source = source; + return this; + } + + public Builder setDest(Path dest) { + this.dest = dest; + return this; + } + + public Builder setCachable(boolean cachable) { + this.cachable = cachable; + return this; + } + + public Builder setExtract(boolean extract) { + this.extract = extract; + return this; + } + + public Builder setExecutable(boolean executable) { + this.executable = executable; + return this; + } + + public Artifact build() { + return new Artifact(source, dest, executable, cachable, extract); + } + } + } + + /** + * Format the system properties as a shell-compatible command-line argument. + */ + public static String formatSystemProperties(Configuration jvmArgs) { + StringBuilder sb = new StringBuilder(); + for(Map.Entry<String,String> entry : jvmArgs.toMap().entrySet()) { + if(sb.length() > 0) { + sb.append(" "); + } + boolean quoted = entry.getValue().contains(" "); + if(quoted) { + sb.append("\""); + } + sb.append("-D").append(entry.getKey()).append('=').append(entry.getValue()); + if(quoted) { + sb.append("\""); + } + } + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/AbstractContainerOverlay.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/AbstractContainerOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/AbstractContainerOverlay.java new file mode 100644 index 0000000..007146a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/AbstractContainerOverlay.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; + +import java.io.File; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; + +/** + * An abstract container overlay. + */ +abstract class AbstractContainerOverlay implements ContainerOverlay { + + /** + * Add a path recursively to the container specification. + * + * If the path is a directory, the directory itself (not just its contents) is added to the target path. + * + * The execute bit is preserved; permissions aren't. + * + * @param sourcePath the path to add. + * @param targetPath the target path. + * @param env the specification to mutate. + * @throws IOException + */ + protected void addPathRecursively( + final File sourcePath, final Path targetPath, final ContainerSpecification env) throws IOException { + + final java.nio.file.Path sourceRoot = sourcePath.toPath().getParent(); + + Files.walkFileTree(sourcePath.toPath(), new SimpleFileVisitor<java.nio.file.Path>() { + @Override + public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) throws IOException { + + java.nio.file.Path relativePath = sourceRoot.relativize(file); + + ContainerSpecification.Artifact.Builder artifact = ContainerSpecification.Artifact.newBuilder() + .setSource(new Path(file.toUri())) + .setDest(new Path(targetPath, relativePath.toString())) + .setExecutable(Files.isExecutable(file)) + .setCachable(true) + .setExtract(false); + + env.getArtifacts().add(artifact.build()); + + return super.visitFile(file, attrs); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/CompositeContainerOverlay.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/CompositeContainerOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/CompositeContainerOverlay.java new file mode 100644 index 0000000..11e8f21 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/CompositeContainerOverlay.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.runtime.clusterframework.ContainerSpecification; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * A composite overlay that delegates to a set of inner overlays. + */ +public class CompositeContainerOverlay implements ContainerOverlay { + + private final List<ContainerOverlay> overlays; + + public CompositeContainerOverlay(ContainerOverlay... overlays) { + this(Arrays.asList(overlays)); + } + + public CompositeContainerOverlay(List<ContainerOverlay> overlays) { + this.overlays = Collections.unmodifiableList(overlays); + } + + @Override + public void configure(ContainerSpecification containerConfig) throws IOException { + for(ContainerOverlay overlay : overlays) { + overlay.configure(containerConfig); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlay.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlay.java new file mode 100644 index 0000000..62826e2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlay.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.runtime.clusterframework.ContainerSpecification; + +import java.io.IOException; + +/** + * A container overlay to produce a container specification. + * + * An overlay applies configuration elements, environment variables, + * system properties, and artifacts to a container specification. + */ +public interface ContainerOverlay { + + /** + * Configure the given container specification. + */ + void configure(ContainerSpecification containerSpecification) throws IOException; +} http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java new file mode 100644 index 0000000..a36cc67 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_BIN_DIR; +import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR; +import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_HOME_DIR; +import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Overlays Flink into a container, based on supplied bin/conf/lib directories. + * + * The overlayed Flink is indistinguishable from (and interchangeable with) + * a normal installation of Flink. For a docker image-based container, it should be + * possible to bypass this overlay and rely on the normal installation method. + * + * The following files are copied to the container: + * - flink/bin/ + * - flink/conf/ + * - flink/lib/ + */ +public class FlinkDistributionOverlay extends AbstractContainerOverlay { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistributionOverlay.class); + + static final Path TARGET_ROOT = new Path("flink"); + + final File flinkBinPath; + final File flinkConfPath; + final File flinkLibPath; + + public FlinkDistributionOverlay(File flinkBinPath, File flinkConfPath, File flinkLibPath) { + this.flinkBinPath = checkNotNull(flinkBinPath); + this.flinkConfPath = checkNotNull(flinkConfPath); + this.flinkLibPath = checkNotNull(flinkLibPath); + } + + @Override + public void configure(ContainerSpecification container) throws IOException { + + container.getEnvironmentVariables().put(ENV_FLINK_HOME_DIR, TARGET_ROOT.toString()); + + // add the paths to the container specification. + addPathRecursively(flinkBinPath, TARGET_ROOT, container); + addPathRecursively(flinkConfPath, TARGET_ROOT, container); + addPathRecursively(flinkLibPath, TARGET_ROOT, container); + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * A builder for the {@link FlinkDistributionOverlay}. + */ + public static class Builder { + File flinkBinPath; + File flinkConfPath; + File flinkLibPath; + + /** + * Configures the overlay using the current environment. + * + * Locates Flink using FLINK_???_DIR environment variables as provided to all Flink processes by config.sh. + * + * @param globalConfiguration the current configuration. + */ + public Builder fromEnvironment(Configuration globalConfiguration) { + + Map<String,String> env = System.getenv(); + if(env.containsKey(ENV_FLINK_BIN_DIR)) { + flinkBinPath = new File(System.getenv(ENV_FLINK_BIN_DIR)); + } + else { + throw new IllegalStateException(String.format("the {} environment variable must be set", ENV_FLINK_BIN_DIR)); + } + + if(env.containsKey(ENV_FLINK_CONF_DIR)) { + flinkConfPath = new File(System.getenv(ENV_FLINK_CONF_DIR)); + } + else { + throw new IllegalStateException(String.format("the {} environment variable must be set", ENV_FLINK_CONF_DIR)); + } + + if(env.containsKey(ENV_FLINK_LIB_DIR)) { + flinkLibPath = new File(System.getenv(ENV_FLINK_LIB_DIR)); + } + else { + throw new IllegalStateException(String.format("the {} environment variable must be set", ENV_FLINK_LIB_DIR)); + } + + return this; + } + + public FlinkDistributionOverlay build() { + return new FlinkDistributionOverlay(flinkBinPath, flinkConfPath, flinkLibPath); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlay.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlay.java new file mode 100644 index 0000000..bd79218 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlay.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; + +/** + * Overlays a Hadoop configuration into a container, based on a supplied Hadoop + * configuration directory. + * + * The following files are copied to the container: + * - hadoop/conf/core-site.xml + * - hadoop/conf/hdfs-site.xml + * + * The following environment variables are set in the container: + * - HADOOP_CONF_DIR + * + * The folloowing Flink configuration entries are updated: + * - fs.hdfs.hadoopconf + */ +public class HadoopConfOverlay implements ContainerOverlay { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopConfOverlay.class); + + /** + * The (relative) directory into which the Hadoop conf is copied. + */ + static final Path TARGET_CONF_DIR = new Path("hadoop/conf"); + + final File hadoopConfDir; + + public HadoopConfOverlay(@Nullable File hadoopConfDir) { + this.hadoopConfDir = hadoopConfDir; + } + + @Override + public void configure(ContainerSpecification container) throws IOException { + + if(hadoopConfDir == null) { + return; + } + + File coreSitePath = new File(hadoopConfDir, "core-site.xml"); + File hdfsSitePath = new File(hadoopConfDir, "hdfs-site.xml"); + + container.getEnvironmentVariables().put("HADOOP_CONF_DIR", TARGET_CONF_DIR.toString()); + container.getDynamicConfiguration().setString(ConfigConstants.PATH_HADOOP_CONFIG, TARGET_CONF_DIR.toString()); + + container.getArtifacts().add(ContainerSpecification.Artifact + .newBuilder() + .setSource(new Path(coreSitePath.toURI())) + .setDest(new Path(TARGET_CONF_DIR, coreSitePath.getName())) + .setCachable(true) + .build()); + + container.getArtifacts().add(ContainerSpecification.Artifact + .newBuilder() + .setSource(new Path(hdfsSitePath.toURI())) + .setDest(new Path(TARGET_CONF_DIR, hdfsSitePath.getName())) + .setCachable(true) + .build()); + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * A builder for the {@link HadoopConfOverlay}. + */ + public static class Builder { + + File hadoopConfDir; + + /** + * Configures the overlay using the current environment's Hadoop configuration. + * + * The following locations are checked for a Hadoop configuration: + * - (conf) fs.hdfs.hadoopconf + * - (env) HADOOP_CONF_DIR + * - (env) HADOOP_HOME/conf + * - (env) HADOOP_HOME/etc/hadoop + * + */ + public Builder fromEnvironment(Configuration globalConfiguration) { + + String[] possibleHadoopConfPaths = new String[4]; + possibleHadoopConfPaths[0] = globalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null); + possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR"); + + if (System.getenv("HADOOP_HOME") != null) { + possibleHadoopConfPaths[2] = System.getenv("HADOOP_HOME")+"/conf"; + possibleHadoopConfPaths[3] = System.getenv("HADOOP_HOME")+"/etc/hadoop"; // hadoop 2.2 + } + + for (String possibleHadoopConfPath : possibleHadoopConfPaths) { + if (possibleHadoopConfPath != null) { + File confPath = new File(possibleHadoopConfPath); + + File coreSitePath = new File(confPath, "core-site.xml"); + File hdfsSitePath = new File(confPath, "hdfs-site.xml"); + + if (coreSitePath.exists() && hdfsSitePath.exists()) { + this.hadoopConfDir = confPath; + break; + } + } + } + + if(hadoopConfDir == null) { + LOG.warn("Unable to locate a Hadoop configuration; HDFS will use defaults."); + } + + return this; + } + + public HadoopConfOverlay build() { + return new HadoopConfOverlay(hadoopConfDir); + } + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlay.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlay.java new file mode 100644 index 0000000..7081aea --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlay.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; + +/** + * Overlays a Hadoop user context into a container. + * + * The overlay essentially configures Hadoop's {@link UserGroupInformation} class, + * establishing the effective username for filesystem calls to HDFS in non-secure clusters. + * + * In secure clusters, the configured keytab establishes the effective user. + * + * The following environment variables are set in the container: + * - HADOOP_USER_NAME + */ +public class HadoopUserOverlay implements ContainerOverlay { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopUserOverlay.class); + + private final UserGroupInformation ugi; + + public HadoopUserOverlay(@Nullable UserGroupInformation ugi) { + this.ugi = ugi; + } + + @Override + public void configure(ContainerSpecification container) throws IOException { + if(ugi != null) { + // overlay the Hadoop user identity (w/ tokens) + container.getEnvironmentVariables().put("HADOOP_USER_NAME", ugi.getUserName()); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * A builder for the {@link HadoopUserOverlay}. + */ + public static class Builder { + + UserGroupInformation ugi; + + /** + * Configures the overlay using the current Hadoop user information (from {@link UserGroupInformation}). + */ + public Builder fromEnvironment(Configuration globalConfiguration) throws IOException { + ugi = UserGroupInformation.getCurrentUser(); + return this; + } + + public HadoopUserOverlay build() { + return new HadoopUserOverlay(ugi); + } + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java new file mode 100644 index 0000000..7fe5b3e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; + + +/** + * Overlays cluster-level Kerberos credentials (i.e. keytab) into a container. + * + * The folloowing Flink configuration entries are updated: + * - security.keytab + */ +public class KeytabOverlay extends AbstractContainerOverlay { + + private static final Logger LOG = LoggerFactory.getLogger(KeytabOverlay.class); + + static final Path TARGET_PATH = new Path("krb5.keytab"); + + final Path keytab; + + public KeytabOverlay(@Nullable File keytab) { + this.keytab = keytab != null ? new Path(keytab.toURI()) : null; + } + + public KeytabOverlay(@Nullable Path keytab) { + this.keytab = keytab; + } + + @Override + public void configure(ContainerSpecification container) throws IOException { + if(keytab != null) { + container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder() + .setSource(keytab) + .setDest(TARGET_PATH) + .setCachable(false) + .build()); + container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_KEYTAB_KEY, TARGET_PATH.getPath()); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * A builder for the {@link HadoopUserOverlay}. + */ + public static class Builder { + + File keytabPath; + + /** + * Configures the overlay using the current environment (and global configuration). + * + * The following Flink configuration settings are checked for a keytab: + * - security.keytab + */ + public Builder fromEnvironment(Configuration globalConfiguration) { + String keytab = globalConfiguration.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null); + if(keytab != null) { + keytabPath = new File(keytab); + if(!keytabPath.exists()) { + throw new IllegalStateException("Invalid configuration for " + + ConfigConstants.SECURITY_KEYTAB_KEY + + "; '" + keytab + "' not found."); + } + } + + return this; + } + + public KeytabOverlay build() { + return new KeytabOverlay(keytabPath); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlay.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlay.java new file mode 100644 index 0000000..fb161b9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlay.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; + + +/** + * Overlays a Kerberos configuration file into a container. + * + * The following files are copied to the container: + * - krb5.conf + * + * The following Java system properties are set in the container: + * - java.security.krb5.conf + */ +public class Krb5ConfOverlay extends AbstractContainerOverlay { + + private static final Logger LOG = LoggerFactory.getLogger(Krb5ConfOverlay.class); + + static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; + + static final Path TARGET_PATH = new Path("krb5.conf"); + final Path krb5Conf; + + public Krb5ConfOverlay(@Nullable File krb5Conf) { + this.krb5Conf = krb5Conf != null ? new Path(krb5Conf.toURI()) : null; + } + + public Krb5ConfOverlay(@Nullable Path krb5Conf) { + this.krb5Conf = krb5Conf; + } + + @Override + public void configure(ContainerSpecification container) throws IOException { + if(krb5Conf != null) { + container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder() + .setSource(krb5Conf) + .setDest(TARGET_PATH) + .setCachable(true) + .build()); + container.getSystemProperties().setString(JAVA_SECURITY_KRB5_CONF, TARGET_PATH.getPath()); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * A builder for the {@link Krb5ConfOverlay}. + */ + public static class Builder { + + File krb5ConfPath; + + /** + * Configures the overlay using the current environment. + * + * Locates the krb5.conf configuration file as per + * <a href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html">Java documentation</a>. + * Note that the JRE doesn't support the KRB5_CONFIG environment variable (JDK-7045913). + */ + public Builder fromEnvironment(Configuration globalConfiguration) { + + // check the system property + String krb5Config = System.getProperty(JAVA_SECURITY_KRB5_CONF); + if(krb5Config != null && krb5Config.length() != 0) { + krb5ConfPath = new File(krb5Config); + if(!krb5ConfPath.exists()) { + throw new IllegalStateException("java.security.krb5.conf refers to a non-existent file"); + } + } + + // FUTURE: check the well-known paths + // - $JAVA_HOME/lib/security + // - %WINDIR%\krb5.ini (Windows) + // - /etc/krb5.conf (Linux) + + return this; + } + + public Krb5ConfOverlay build() { + return new Krb5ConfOverlay(krb5ConfPath); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java new file mode 100644 index 0000000..dd79ca1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; + + +/** + * Overlays an SSL keystore/truststore into a container. + * + * The following files are placed into the container: + * - keystore.jks + * - truststore.jks + * + * The following Flink configuration entries are set: + * - security.ssl.keystore + * - security.ssl.truststore + */ +public class SSLStoreOverlay extends AbstractContainerOverlay { + + private static final Logger LOG = LoggerFactory.getLogger(SSLStoreOverlay.class); + + static final Path TARGET_KEYSTORE_PATH = new Path("keystore.jks"); + static final Path TARGET_TRUSTSTORE_PATH = new Path("truststore.jks"); + + final Path keystore; + final Path truststore; + + public SSLStoreOverlay(@Nullable File keystoreFile, @Nullable File truststoreFile) { + this.keystore = keystoreFile != null ? new Path(keystoreFile.toURI()) : null; + this.truststore = truststoreFile != null ? new Path(truststoreFile.toURI()) : null; + } + + @Override + public void configure(ContainerSpecification container) throws IOException { + if(keystore != null) { + container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder() + .setSource(keystore) + .setDest(TARGET_KEYSTORE_PATH) + .setCachable(false) + .build()); + container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_SSL_KEYSTORE, TARGET_KEYSTORE_PATH.getPath()); + } + if(truststore != null) { + container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder() + .setSource(truststore) + .setDest(TARGET_TRUSTSTORE_PATH) + .setCachable(false) + .build()); + container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, TARGET_TRUSTSTORE_PATH.getPath()); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * A builder for the {@link Krb5ConfOverlay}. + */ + public static class Builder { + + File keystorePath; + + File truststorePath; + + /** + * Configures the overlay using the current environment (and global configuration). + * + * The following Flink configuration settings are used to source the keystore and truststore: + * - security.ssl.keystore + * - security.ssl.truststore + */ + public Builder fromEnvironment(Configuration globalConfiguration) { + + String keystore = globalConfiguration.getString(ConfigConstants.SECURITY_SSL_KEYSTORE, null); + if(keystore != null) { + keystorePath = new File(keystore); + if(!keystorePath.exists()) { + throw new IllegalStateException("Invalid configuration for " + ConfigConstants.SECURITY_SSL_KEYSTORE); + } + } + + String truststore = globalConfiguration.getString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, null); + if(truststore != null) { + truststorePath = new File(truststore); + if(!truststorePath.exists()) { + throw new IllegalStateException("Invalid configuration for " + ConfigConstants.SECURITY_SSL_TRUSTSTORE); + } + } + + return this; + } + + public SSLStoreOverlay build() { + return new SSLStoreOverlay(keystorePath, truststorePath); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java index f6e0a8c..7416cc6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java @@ -53,7 +53,7 @@ public class SecurityUtils { public static final String JAAS_CONF_FILENAME = "flink-jaas.conf"; - private static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config"; + public static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config"; private static final String ZOOKEEPER_SASL_CLIENT = "zookeeper.sasl.client"; @@ -130,6 +130,8 @@ public class SecurityUtils { loginUser = UserGroupInformation.getLoginUser(); } + LOG.info("Hadoop user set to {}", loginUser.toString()); + boolean delegationToken = false; final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN"); Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens(); http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlayTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlayTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlayTestBase.java new file mode 100644 index 0000000..bbea376 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlayTestBase.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.junit.After; +import org.junit.Before; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +public class ContainerOverlayTestBase { + + private Map<String, String> originalEnvironment; + + @Before + public void before() { + originalEnvironment = new HashMap<>(System.getenv()); + } + + @After + public void after() { + CommonTestUtils.setEnv(originalEnvironment, true); + } + + + /** + * Create an empty file for each given path. + * @param root the root folder in which to create the files. + * @param paths the relative paths to create. + */ + protected static Path[] createPaths(File root, String... paths) throws Exception { + Path[] files = new Path[paths.length]; + for(int i = 0; i < paths.length; i++) { + File file = root.toPath().resolve(paths[i]).toFile(); + file.getParentFile().mkdirs(); + file.createNewFile(); + files[i] = new Path(paths[i]); + } + return files; + } + + /** + * Check that an artifact exists for the given remote path. + */ + protected static ContainerSpecification.Artifact checkArtifact(ContainerSpecification spec, Path remotePath) { + for(ContainerSpecification.Artifact artifact : spec.getArtifacts()) { + if(remotePath.equals(artifact.dest)) { + return artifact; + } + } + throw new AssertionError("no such artifact (" + remotePath + ")"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java new file mode 100644 index 0000000..e77dd3a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_BIN_DIR; +import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR; +import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR; + +import static org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay.TARGET_ROOT; + +public class FlinkDistributionOverlayTest extends ContainerOverlayTestBase { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testConfigure() throws Exception { + + File binFolder = tempFolder.newFolder("bin"); + File libFolder = tempFolder.newFolder("lib"); + File confFolder = tempFolder.newFolder("conf"); + + Path[] files = createPaths( + tempFolder.getRoot(), + "bin/config.sh", + "bin/taskmanager.sh", + "lib/foo.jar", + "lib/A/foo.jar", + "lib/B/foo.jar", + "lib/B/bar.jar"); + + ContainerSpecification containerSpecification = new ContainerSpecification(); + FlinkDistributionOverlay overlay = new FlinkDistributionOverlay( + binFolder, + confFolder, + libFolder + ); + overlay.configure(containerSpecification); + + for(Path file : files) { + checkArtifact(containerSpecification, new Path(TARGET_ROOT, file.toString())); + } + } + + @Test + public void testBuilderFromEnvironment() throws Exception { + Configuration conf = new Configuration(); + + File binFolder = tempFolder.newFolder("bin"); + File libFolder = tempFolder.newFolder("lib"); + File confFolder = tempFolder.newFolder("conf"); + + // adjust the test environment for the purposes of this test + Map<String, String> map = new HashMap<String, String>(System.getenv()); + map.put(ENV_FLINK_BIN_DIR, binFolder.getAbsolutePath()); + map.put(ENV_FLINK_LIB_DIR, libFolder.getAbsolutePath()); + map.put(ENV_FLINK_CONF_DIR, confFolder.getAbsolutePath()); + CommonTestUtils.setEnv(map); + + FlinkDistributionOverlay.Builder builder = FlinkDistributionOverlay.newBuilder().fromEnvironment(conf); + + assertEquals(binFolder.getAbsolutePath(), builder.flinkBinPath.getAbsolutePath()); + assertEquals(libFolder.getAbsolutePath(), builder.flinkLibPath.getAbsolutePath()); + assertEquals(confFolder.getAbsolutePath(), builder.flinkConfPath.getAbsolutePath()); + } + + @Test + public void testBuilderFromEnvironmentBad() throws Exception { + Configuration conf = new Configuration(); + + // adjust the test environment for the purposes of this test + Map<String, String> map = new HashMap<>(System.getenv()); + map.remove(ENV_FLINK_BIN_DIR); + map.remove(ENV_FLINK_LIB_DIR); + map.remove(ENV_FLINK_CONF_DIR); + CommonTestUtils.setEnv(map); + + try { + FlinkDistributionOverlay.Builder builder = FlinkDistributionOverlay.newBuilder().fromEnvironment(conf); + fail(); + } + catch(IllegalStateException e) { + // expected + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlayTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlayTest.java new file mode 100644 index 0000000..c3ea41b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlayTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +import static org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay.TARGET_CONF_DIR; + +public class HadoopConfOverlayTest extends ContainerOverlayTestBase { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testConfigure() throws Exception { + + File confDir = tempFolder.newFolder(); + initConfDir(confDir); + + HadoopConfOverlay overlay = new HadoopConfOverlay(confDir); + + ContainerSpecification spec = new ContainerSpecification(); + overlay.configure(spec); + + assertEquals(TARGET_CONF_DIR.getPath(), spec.getEnvironmentVariables().get("HADOOP_CONF_DIR")); + assertEquals(TARGET_CONF_DIR.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.PATH_HADOOP_CONFIG, null)); + + checkArtifact(spec, new Path(TARGET_CONF_DIR, "core-site.xml")); + checkArtifact(spec, new Path(TARGET_CONF_DIR, "hdfs-site.xml")); + } + + @Test + public void testNoConf() throws Exception { + HadoopConfOverlay overlay = new HadoopConfOverlay(null); + + ContainerSpecification containerSpecification = new ContainerSpecification(); + overlay.configure(containerSpecification); + } + + @Test + public void testBuilderFromEnvironment() throws Exception { + + // verify that the builder picks up various environment locations + HadoopConfOverlay.Builder builder; + Map<String, String> env; + + // fs.hdfs.hadoopconf + File confDir = tempFolder.newFolder(); + initConfDir(confDir); + Configuration conf = new Configuration(); + conf.setString(ConfigConstants.PATH_HADOOP_CONFIG, confDir.getAbsolutePath()); + builder = HadoopConfOverlay.newBuilder().fromEnvironment(conf); + assertEquals(confDir, builder.hadoopConfDir); + + // HADOOP_CONF_DIR + env = new HashMap<String, String>(System.getenv()); + env.remove("HADOOP_HOME"); + env.put("HADOOP_CONF_DIR", confDir.getAbsolutePath()); + CommonTestUtils.setEnv(env); + builder = HadoopConfOverlay.newBuilder().fromEnvironment(new Configuration()); + assertEquals(confDir, builder.hadoopConfDir); + + // HADOOP_HOME/conf + File homeDir = tempFolder.newFolder(); + confDir = initConfDir(new File(homeDir, "conf")); + env = new HashMap<String, String>(System.getenv()); + env.remove("HADOOP_CONF_DIR"); + env.put("HADOOP_HOME", homeDir.getAbsolutePath()); + CommonTestUtils.setEnv(env); + builder = HadoopConfOverlay.newBuilder().fromEnvironment(new Configuration()); + assertEquals(confDir, builder.hadoopConfDir); + + // HADOOP_HOME/etc/hadoop + homeDir = tempFolder.newFolder(); + confDir = initConfDir(new File(homeDir, "etc/hadoop")); + env = new HashMap<String, String>(System.getenv()); + env.remove("HADOOP_CONF_DIR"); + env.put("HADOOP_HOME", homeDir.getAbsolutePath()); + CommonTestUtils.setEnv(env); + builder = HadoopConfOverlay.newBuilder().fromEnvironment(new Configuration()); + assertEquals(confDir, builder.hadoopConfDir); + } + + private File initConfDir(File confDir) throws Exception { + confDir.mkdirs(); + new File(confDir, "core-site.xml").createNewFile(); + new File(confDir, "hdfs-site.xml").createNewFile(); + return confDir; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlayTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlayTest.java new file mode 100644 index 0000000..7a463b8 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlayTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Test; + +import java.security.PrivilegedAction; + +import static org.junit.Assert.assertEquals; + +public class HadoopUserOverlayTest extends ContainerOverlayTestBase { + + @Test + public void testConfigure() throws Exception { + + final UserGroupInformation ugi = UserGroupInformation.createRemoteUser("test"); + + HadoopUserOverlay overlay = new HadoopUserOverlay(ugi); + + ContainerSpecification spec = new ContainerSpecification(); + overlay.configure(spec); + + assertEquals(ugi.getUserName(), spec.getEnvironmentVariables().get("HADOOP_USER_NAME")); + } + + @Test + public void testNoConf() throws Exception { + HadoopUserOverlay overlay = new HadoopUserOverlay(null); + + ContainerSpecification containerSpecification = new ContainerSpecification(); + overlay.configure(containerSpecification); + } + + @Test + public void testBuilderFromEnvironment() throws Exception { + + final Configuration conf = new Configuration(); + final UserGroupInformation ugi = UserGroupInformation.createRemoteUser("test"); + + ugi.doAs(new PrivilegedAction<Object>() { + @Override + public Object run() { + try { + HadoopUserOverlay.Builder builder = HadoopUserOverlay.newBuilder().fromEnvironment(conf); + assertEquals(ugi, builder.ugi); + return null; + } + catch(Exception ex) { + throw new AssertionError(ex); + } + } + }); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java new file mode 100644 index 0000000..0570f28 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; + +import static org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay.TARGET_PATH; +import static org.junit.Assert.assertEquals; + +public class KeytabOverlayTest extends ContainerOverlayTestBase { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testConfigure() throws Exception { + + File keytab = tempFolder.newFile(); + + KeytabOverlay overlay = new KeytabOverlay(keytab); + + ContainerSpecification spec = new ContainerSpecification(); + overlay.configure(spec); + + assertEquals(TARGET_PATH.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.SECURITY_KEYTAB_KEY, null)); + checkArtifact(spec, TARGET_PATH); + } + + @Test + public void testNoConf() throws Exception { + KeytabOverlay overlay = new KeytabOverlay((Path) null); + + ContainerSpecification containerSpecification = new ContainerSpecification(); + overlay.configure(containerSpecification); + } + + @Test + public void testBuilderFromEnvironment() throws Exception { + + final Configuration conf = new Configuration(); + File keytab = tempFolder.newFile(); + + conf.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytab.getAbsolutePath()); + KeytabOverlay.Builder builder = KeytabOverlay.newBuilder().fromEnvironment(conf); + assertEquals(builder.keytabPath, keytab); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlayTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlayTest.java new file mode 100644 index 0000000..1f86b89 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlayTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; + +import static org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay.JAVA_SECURITY_KRB5_CONF; +import static org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay.TARGET_PATH; +import static org.junit.Assert.assertEquals; + +public class Krb5ConfOverlayTest extends ContainerOverlayTestBase { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testConfigure() throws Exception { + + File krb5conf = tempFolder.newFile(); + + Krb5ConfOverlay overlay = new Krb5ConfOverlay(krb5conf); + + ContainerSpecification spec = new ContainerSpecification(); + overlay.configure(spec); + + assertEquals(TARGET_PATH.getPath(), spec.getSystemProperties().getString(JAVA_SECURITY_KRB5_CONF, null)); + checkArtifact(spec, TARGET_PATH); + } + + @Test + public void testNoConf() throws Exception { + Krb5ConfOverlay overlay = new Krb5ConfOverlay((Path) null); + + ContainerSpecification containerSpecification = new ContainerSpecification(); + overlay.configure(containerSpecification); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java new file mode 100644 index 0000000..0894ce6 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlayTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; + +import static org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay.TARGET_KEYSTORE_PATH; +import static org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay.TARGET_TRUSTSTORE_PATH; +import static org.junit.Assert.assertEquals; + +public class SSLStoreOverlayTest extends ContainerOverlayTestBase { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testConfigure() throws Exception { + + File keystore = tempFolder.newFile(); + File truststore = tempFolder.newFile(); + SSLStoreOverlay overlay = new SSLStoreOverlay(keystore, truststore); + + ContainerSpecification spec = new ContainerSpecification(); + overlay.configure(spec); + + assertEquals(TARGET_KEYSTORE_PATH.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.SECURITY_SSL_KEYSTORE, null)); + checkArtifact(spec, TARGET_KEYSTORE_PATH); + + assertEquals(TARGET_TRUSTSTORE_PATH.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, null)); + checkArtifact(spec, TARGET_TRUSTSTORE_PATH); + } + + @Test + public void testNoConf() throws Exception { + SSLStoreOverlay overlay = new SSLStoreOverlay(null, null); + + ContainerSpecification containerSpecification = new ContainerSpecification(); + overlay.configure(containerSpecification); + } + + @Test + public void testBuilderFromEnvironment() throws Exception { + + final Configuration conf = new Configuration(); + File keystore = tempFolder.newFile(); + File truststore = tempFolder.newFile(); + + conf.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, keystore.getAbsolutePath()); + conf.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, truststore.getAbsolutePath()); + + SSLStoreOverlay.Builder builder = SSLStoreOverlay.newBuilder().fromEnvironment(conf); + assertEquals(builder.keystorePath, keystore); + assertEquals(builder.truststorePath, truststore); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java index d318a3c..45c5a77 100644 --- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java @@ -29,6 +29,9 @@ import java.io.FileWriter; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.Map; import static org.junit.Assert.fail; @@ -114,4 +117,40 @@ public class CommonTestUtils { fail("Cannot determine Java version: " + e.getMessage()); } } + + // This code is taken from: http://stackoverflow.com/a/7201825/568695 + // it changes the environment variables of this JVM. Use only for testing purposes! + @SuppressWarnings("unchecked") + public static void setEnv(Map<String, String> newenv) { + try { + Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment"); + Field theEnvironmentField = processEnvironmentClass.getDeclaredField("theEnvironment"); + theEnvironmentField.setAccessible(true); + Map<String, String> env = (Map<String, String>) theEnvironmentField.get(null); + env.putAll(newenv); + Field theCaseInsensitiveEnvironmentField = processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment"); + theCaseInsensitiveEnvironmentField.setAccessible(true); + Map<String, String> cienv = (Map<String, String>) theCaseInsensitiveEnvironmentField.get(null); + cienv.putAll(newenv); + } catch (NoSuchFieldException e) { + try { + Class<?>[] classes = Collections.class.getDeclaredClasses(); + Map<String, String> env = System.getenv(); + for (Class<?> cl : classes) { + if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) { + Field field = cl.getDeclaredField("m"); + field.setAccessible(true); + Object obj = field.get(env); + Map<String, String> map = (Map<String, String>) obj; + map.clear(); + map.putAll(newenv); + } + } + } catch (Exception e2) { + throw new RuntimeException(e2); + } + } catch (Exception e1) { + throw new RuntimeException(e1); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index aa5e7d3..804b3d4 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -33,6 +33,7 @@ import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.messages.TaskManagerMessages; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.util.TestLogger; @@ -58,7 +59,6 @@ import java.io.FileReader; import java.io.FilenameFilter; import java.io.IOException; import java.io.InputStream; -import java.lang.reflect.Field; import java.net.HttpURLConnection; import java.net.URI; import java.net.URISyntaxException; @@ -542,42 +542,10 @@ public class TestBaseUtils extends TestLogger { return configs; } - // This code is taken from: http://stackoverflow.com/a/7201825/568695 - // it changes the environment variables of this JVM. Use only for testing purposes! - @SuppressWarnings("unchecked") public static void setEnv(Map<String, String> newenv) { - try { - Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment"); - Field theEnvironmentField = processEnvironmentClass.getDeclaredField("theEnvironment"); - theEnvironmentField.setAccessible(true); - Map<String, String> env = (Map<String, String>) theEnvironmentField.get(null); - env.putAll(newenv); - Field theCaseInsensitiveEnvironmentField = processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment"); - theCaseInsensitiveEnvironmentField.setAccessible(true); - Map<String, String> cienv = (Map<String, String>) theCaseInsensitiveEnvironmentField.get(null); - cienv.putAll(newenv); - } catch (NoSuchFieldException e) { - try { - Class<?>[] classes = Collections.class.getDeclaredClasses(); - Map<String, String> env = System.getenv(); - for (Class<?> cl : classes) { - if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) { - Field field = cl.getDeclaredField("m"); - field.setAccessible(true); - Object obj = field.get(env); - Map<String, String> map = (Map<String, String>) obj; - map.clear(); - map.putAll(newenv); - } - } - } catch (Exception e2) { - throw new RuntimeException(e2); - } - } catch (Exception e1) { - throw new RuntimeException(e1); - } + CommonTestUtils.setEnv(newenv); } - + private static ExecutionContext defaultExecutionContext() { return ExecutionContext$.MODULE$.global(); }