http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCore.java new file mode 100644 index 0000000..9767430 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCore.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.providers; + +import org.apache.hadoop.conf.Configuration; +import org.apache.slider.core.conf.AggregateConf; +import org.apache.slider.core.conf.ConfTree; +import org.apache.slider.core.exceptions.SliderException; + +import java.util.List; +public interface ProviderCore { + + String getName(); + + List<ProviderRole> getRoles(); + + Configuration getConf(); + + /** + * Verify that an instance definition is considered valid by the provider + * @param instanceDefinition instance definition + * @throws SliderException if the configuration is not valid + */ + void validateInstanceDefinition(AggregateConf instanceDefinition) throws + SliderException; + +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderRole.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderRole.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderRole.java new file mode 100644 index 0000000..761ac0f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderRole.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.providers; + +import org.apache.slider.api.ResourceKeys; + +/** + * Provider role and key for use in app requests. + * + * This class uses the role name as the key for hashes and in equality tests, + * and ignores the other values. + */ +public final class ProviderRole { + public final String name; + public final String group; + public final int id; + public int placementPolicy; + public int nodeFailureThreshold; + public final long placementTimeoutSeconds; + public final String labelExpression; + + public ProviderRole(String name, int id) { + this(name, + name, + id, + PlacementPolicy.DEFAULT, + ResourceKeys.DEFAULT_NODE_FAILURE_THRESHOLD, + ResourceKeys.DEFAULT_PLACEMENT_ESCALATE_DELAY_SECONDS, + ResourceKeys.DEF_YARN_LABEL_EXPRESSION); + } + + /** + * Create a provider role + * @param name role/component name + * @param id ID. This becomes the YARN priority + * @param policy placement policy + * @param nodeFailureThreshold threshold for node failures (within a reset interval) + * after which a node failure is considered an app failure + * @param placementTimeoutSeconds for lax placement, timeout in seconds before + * @param labelExpression label expression for requests; may be null + */ + public ProviderRole(String name, + int id, + int policy, + int nodeFailureThreshold, + long placementTimeoutSeconds, + String labelExpression) { + this(name, + name, + id, + policy, + nodeFailureThreshold, + placementTimeoutSeconds, + labelExpression); + } + + /** + * Create a provider role with a role group + * @param name role/component name + * @param group role/component group + * @param id ID. This becomes the YARN priority + * @param policy placement policy + * @param nodeFailureThreshold threshold for node failures (within a reset interval) + * after which a node failure is considered an app failure + * @param placementTimeoutSeconds for lax placement, timeout in seconds before + * @param labelExpression label expression for requests; may be null + */ + public ProviderRole(String name, + String group, + int id, + int policy, + int nodeFailureThreshold, + long placementTimeoutSeconds, + String labelExpression) { + this.name = name; + if (group == null) { + this.group = name; + } else { + this.group = group; + } + this.id = id; + this.placementPolicy = policy; + this.nodeFailureThreshold = nodeFailureThreshold; + this.placementTimeoutSeconds = placementTimeoutSeconds; + this.labelExpression = labelExpression; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ProviderRole that = (ProviderRole) o; + return name.equals(that.name); + } + + @Override + public int hashCode() { + return name.hashCode(); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("ProviderRole{"); + sb.append("name='").append(name).append('\''); + sb.append(", group=").append(group); + sb.append(", id=").append(id); + sb.append(", placementPolicy=").append(placementPolicy); + sb.append(", nodeFailureThreshold=").append(nodeFailureThreshold); + sb.append(", placementTimeoutSeconds=").append(placementTimeoutSeconds); + sb.append(", labelExpression='").append(labelExpression).append('\''); + sb.append('}'); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderService.java new file mode 100644 index 0000000..f754eee --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderService.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.providers; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.slider.api.ClusterDescription; +import org.apache.slider.common.tools.SliderFileSystem; +import org.apache.slider.core.conf.AggregateConf; +import org.apache.slider.core.conf.MapOperations; +import org.apache.slider.core.exceptions.BadCommandArgumentsException; +import org.apache.slider.core.exceptions.SliderException; +import org.apache.slider.core.launch.ContainerLauncher; +import org.apache.slider.core.main.ExitCodeProvider; +import org.apache.slider.server.appmaster.actions.QueueAccess; +import org.apache.slider.server.appmaster.operations.RMOperationHandlerActions; +import org.apache.slider.server.appmaster.state.ContainerReleaseSelector; +import org.apache.slider.server.appmaster.state.StateAccessForProviders; +import org.apache.slider.server.appmaster.web.rest.agent.AgentRestOperations; +import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.List; +import java.util.Map; + +public interface ProviderService extends ProviderCore, + Service, + RMOperationHandlerActions, + ExitCodeProvider { + + /** + * Set up the entire container launch context + * @param containerLauncher + * @param instanceDefinition + * @param container + * @param providerRole + * @param sliderFileSystem + * @param generatedConfPath + * @param appComponent + * @param containerTmpDirPath + */ + void buildContainerLaunchContext(ContainerLauncher containerLauncher, + AggregateConf instanceDefinition, + Container container, + ProviderRole providerRole, + SliderFileSystem sliderFileSystem, + Path generatedConfPath, + MapOperations resourceComponent, + MapOperations appComponent, + Path containerTmpDirPath) throws + IOException, + SliderException; + + /** + * Notify the providers of container completion + * @param containerId container that has completed + */ + void notifyContainerCompleted(ContainerId containerId); + + /** + * Execute a process in the AM + * @param instanceDefinition cluster description + * @param confDir configuration directory + * @param env environment + * @param execInProgress the callback for the exec events + * @return true if a process was actually started + * @throws IOException + * @throws SliderException + */ + boolean exec(AggregateConf instanceDefinition, + File confDir, + Map<String, String> env, + ProviderCompleted execInProgress) throws IOException, + SliderException; + + /** + * Scan through the roles and see if it is supported. + * @param role role to look for + * @return true if the role is known about -and therefore + * that a launcher thread can be deployed to launch it + */ + boolean isSupportedRole(String role); + + /** + * Load a specific XML configuration file for the provider config + * @param confDir configuration directory + * @return a configuration to be included in status + * @throws BadCommandArgumentsException + * @throws IOException + */ + Configuration loadProviderConfigurationInformation(File confDir) + throws BadCommandArgumentsException, IOException; + + /** + * The application configuration should be initialized here + * + * @param instanceDefinition + * @param fileSystem + * @throws IOException + * @throws SliderException + */ + void initializeApplicationConfiguration(AggregateConf instanceDefinition, + SliderFileSystem fileSystem) throws IOException, SliderException; + + /** + * This is a validation of the application configuration on the AM. + * Here is where things like the existence of keytabs and other + * not-seen-client-side properties can be tested, before + * the actual process is spawned. + * @param instanceDefinition clusterSpecification + * @param confDir configuration directory + * @param secure flag to indicate that secure mode checks must exist + * @throws IOException IO problemsn + * @throws SliderException any failure + */ + void validateApplicationConfiguration(AggregateConf instanceDefinition, + File confDir, + boolean secure + ) throws IOException, SliderException; + + /* + * Build the provider status, can be empty + * @return the provider status - map of entries to add to the info section + */ + Map<String, String> buildProviderStatus(); + + /** + * Build a map of data intended for the AM webapp that is specific + * about this provider. The key is some text to be displayed, and the + * value can be a URL that will create an anchor over the key text. + * + * If no anchor is needed/desired, insert the key with a null value. + * @return the details + */ + Map<String, MonitorDetail> buildMonitorDetails(ClusterDescription clusterSpec); + + /** + * Get a human friendly name for web UIs and messages + * @return a name string. Default is simply the service instance name. + */ + String getHumanName(); + + public void bind(StateAccessForProviders stateAccessor, + QueueAccess queueAccess, + List<Container> liveContainers); + + /** + * Bind to the YARN registry + * @param yarnRegistry YARN registry + */ + void bindToYarnRegistry(YarnRegistryViewForProviders yarnRegistry); + + /** + * Returns the agent rest operations interface. + * @return the interface if available, null otherwise. + */ + AgentRestOperations getAgentRestOperations(); + + /** + * Build up the endpoint details for this service + * @param details + */ + void buildEndpointDetails(Map<String, MonitorDetail> details); + + /** + * Prior to going live -register the initial service registry data + * @param amWebURI URL to the AM. This may be proxied, so use relative paths + * @param agentOpsURI URI for agent operations. This will not be proxied + * @param agentStatusURI URI For agent status. Again: no proxy + * @param serviceRecord service record to build up + */ + void applyInitialRegistryDefinitions(URL amWebURI, + URL agentOpsURI, + URL agentStatusURI, + ServiceRecord serviceRecord) + throws IOException; + + /** + * Create the container release selector for this provider...any policy + * can be implemented + * @return the selector to use for choosing containers. + */ + ContainerReleaseSelector createContainerReleaseSelector(); + + /** + * On AM restart (for whatever reason) this API is required to rebuild the AM + * internal state with the containers which were already assigned and running + * + * @param liveContainers + * @param applicationId + * @param providerRoles + */ + void rebuildContainerDetails(List<Container> liveContainers, + String applicationId, Map<Integer, ProviderRole> providerRoles); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java new file mode 100644 index 0000000..07d106b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderUtils.java @@ -0,0 +1,530 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.providers; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.slider.api.ClusterDescription; +import org.apache.slider.api.InternalKeys; +import org.apache.slider.api.OptionKeys; +import org.apache.slider.api.ResourceKeys; +import org.apache.slider.api.RoleKeys; +import org.apache.slider.common.SliderKeys; +import org.apache.slider.common.tools.SliderFileSystem; +import org.apache.slider.common.tools.SliderUtils; +import org.apache.slider.core.conf.AggregateConf; +import org.apache.slider.core.conf.ConfTreeOperations; +import org.apache.slider.core.conf.MapOperations; +import org.apache.slider.core.exceptions.BadCommandArgumentsException; +import org.apache.slider.core.exceptions.BadConfigException; +import org.apache.slider.core.exceptions.SliderException; +import org.apache.slider.core.exceptions.SliderInternalStateException; +import org.slf4j.Logger; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +/** + * this is a factoring out of methods handy for providers. It's bonded to a log at + * construction time + */ +public class ProviderUtils implements RoleKeys { + + protected final Logger log; + + /** + * Create an instace + * @param log log directory to use -usually the provider + */ + + public ProviderUtils(Logger log) { + this.log = log; + } + + /** + * Add oneself to the classpath. This does not work + * on minicluster test runs where the JAR is not built up + * @param providerResources map of provider resources to add these entries to + * @param provider provider to add + * @param jarName name of the jar to use + * @param sliderFileSystem target filesystem + * @param tempPath path in the cluster FS for temp files + * @param libdir relative directory to place resources + * @param miniClusterTestRun + * @return true if the class was found in a JAR + * + * @throws FileNotFoundException if the JAR was not found and this is NOT + * a mini cluster test run + * @throws IOException IO problems + * @throws SliderException any Slider problem + */ + public static boolean addProviderJar(Map<String, LocalResource> providerResources, + Object provider, + String jarName, + SliderFileSystem sliderFileSystem, + Path tempPath, + String libdir, + boolean miniClusterTestRun) throws + IOException, + SliderException { + try { + SliderUtils.putJar(providerResources, + sliderFileSystem, + provider.getClass(), + tempPath, + libdir, + jarName); + return true; + } catch (FileNotFoundException e) { + if (miniClusterTestRun) { + return false; + } else { + throw e; + } + } + } + + /** + * Add/overwrite the agent tarball (overwritten every time application is restarted) + * @param provider + * @param tarName + * @param sliderFileSystem + * @param agentDir + * @return true the location could be determined and the file added + * @throws IOException + */ + public static boolean addAgentTar(Object provider, + String tarName, + SliderFileSystem sliderFileSystem, + Path agentDir) throws + IOException { + File localFile = SliderUtils.findContainingJar(provider.getClass()); + if(localFile != null) { + String parentDir = localFile.getParent(); + Path agentTarPath = new Path(parentDir, tarName); + sliderFileSystem.getFileSystem().copyFromLocalFile(false, true, agentTarPath, agentDir); + return true; + } + return false; + } + + /** + * Add a set of dependencies to the provider resources being built up, + * by copying them from the local classpath to the remote one, then + * registering them + * @param providerResources map of provider resources to add these entries to + * @param sliderFileSystem target filesystem + * @param tempPath path in the cluster FS for temp files + * @param libdir relative directory to place resources + * @param resources list of resource names (e.g. "hbase.jar" + * @param classes list of classes where classes[i] refers to a class in + * resources[i] + * @throws IOException IO problems + * @throws SliderException any Slider problem + */ + public static void addDependencyJars(Map<String, LocalResource> providerResources, + SliderFileSystem sliderFileSystem, + Path tempPath, + String libdir, + String[] resources, + Class[] classes + ) throws + IOException, + SliderException { + if (resources.length != classes.length) { + throw new SliderInternalStateException( + "mismatch in Jar names [%d] and classes [%d]", + resources.length, + classes.length); + } + int size = resources.length; + for (int i = 0; i < size; i++) { + String jarName = resources[i]; + Class clazz = classes[i]; + SliderUtils.putJar(providerResources, + sliderFileSystem, + clazz, + tempPath, + libdir, + jarName); + } + + } + + /** + * Loads all dependency jars from the default path + * @param providerResources map of provider resources to add these entries to + * @param sliderFileSystem target filesystem + * @param tempPath path in the cluster FS for temp files + * @param libDir relative directory to place resources + * @param libLocalSrcDir explicitly supplied local libs dir + * @throws IOException + * @throws SliderException + */ + public static void addAllDependencyJars(Map<String, LocalResource> providerResources, + SliderFileSystem sliderFileSystem, + Path tempPath, + String libDir, + String libLocalSrcDir) + throws IOException, SliderException { + String libSrcToUse = libLocalSrcDir; + if (SliderUtils.isSet(libLocalSrcDir)) { + File file = new File(libLocalSrcDir); + if (!file.exists() || !file.isDirectory()) { + throw new BadCommandArgumentsException("Supplied lib src dir %s is not valid", libLocalSrcDir); + } + } + SliderUtils.putAllJars(providerResources, sliderFileSystem, tempPath, libDir, libSrcToUse); + } + + /** + * build the log directory + * @return the log dir + */ + public String getLogdir() throws IOException { + String logdir = System.getenv("LOGDIR"); + if (logdir == null) { + logdir = + SliderKeys.TMP_LOGDIR_PREFIX + UserGroupInformation.getCurrentUser().getShortUserName(); + } + return logdir; + } + + + public void validateNodeCount(AggregateConf instanceDescription, + String name, int min, int max) throws + BadCommandArgumentsException { + MapOperations component = + instanceDescription.getResourceOperations().getComponent(name); + int count; + if (component == null) { + count = 0; + } else { + count = component.getOptionInt(ResourceKeys.COMPONENT_INSTANCES, 0); + } + validateNodeCount(name, count, min, max); + } + + /** + * Validate the node count and heap size values of a node class + * <p> + * If max <= 0: min <= count + * If max > 0: min <= count <= max + * @param name node class name + * @param count requested node count + * @param min requested heap size + * @param max maximum value. + * @throws BadCommandArgumentsException if the values are out of range + */ + public void validateNodeCount(String name, + int count, + int min, + int max) throws BadCommandArgumentsException { + if (count < min) { + throw new BadCommandArgumentsException( + "requested no of %s nodes: %d is below the minimum of %d", name, count, + min); + } + if (max > 0 && count > max) { + throw new BadCommandArgumentsException( + "requested no of %s nodes: %d is above the maximum of %d", name, count, + max); + } + } + + /** + * copy all options beginning site. into the site.xml + * @param clusterSpec cluster specification + * @param sitexml map for XML file to build up + */ + public void propagateSiteOptions(ClusterDescription clusterSpec, + Map<String, String> sitexml) { + Map<String, String> options = clusterSpec.options; + propagateSiteOptions(options, sitexml); + } + + public void propagateSiteOptions(Map<String, String> options, + Map<String, String> sitexml) { + propagateSiteOptions(options, sitexml, ""); + } + + public void propagateSiteOptions(Map<String, String> options, + Map<String, String> sitexml, + String configName) { + propagateSiteOptions(options, sitexml, configName, null); + } + + public void propagateSiteOptions(Map<String, String> options, + Map<String, String> sitexml, + String configName, + Map<String,String> tokenMap) { + String prefix = OptionKeys.SITE_XML_PREFIX + + (!configName.isEmpty() ? configName + "." : ""); + for (Map.Entry<String, String> entry : options.entrySet()) { + String key = entry.getKey(); + if (key.startsWith(prefix)) { + String envName = key.substring(prefix.length()); + if (!envName.isEmpty()) { + String value = entry.getValue(); + if (tokenMap != null) { + for (Map.Entry<String,String> token : tokenMap.entrySet()) { + value = value.replaceAll(Pattern.quote(token.getKey()), + token.getValue()); + } + } + sitexml.put(envName, value); + } + } + } + } + + /** + * Propagate an option from the cluster specification option map + * to the site XML map, using the site key for the name + * @param global global config spec + * @param optionKey key in the option map + * @param sitexml map for XML file to build up + * @param siteKey key to assign the value to in the site XML + * @throws BadConfigException if the option is missing from the cluster spec + */ + public void propagateOption(MapOperations global, + String optionKey, + Map<String, String> sitexml, + String siteKey) throws BadConfigException { + sitexml.put(siteKey, global.getMandatoryOption(optionKey)); + } + + + /** + * Build the image dir. This path is relative and only valid at the far end + * @param instanceDefinition instance definition + * @param bindir bin subdir + * @param script script in bin subdir + * @return the path to the script + * @throws FileNotFoundException if a file is not found, or it is not a directory* + */ + public String buildPathToHomeDir(AggregateConf instanceDefinition, + String bindir, + String script) throws + FileNotFoundException, + BadConfigException { + MapOperations globalOptions = + instanceDefinition.getInternalOperations().getGlobalOptions(); + String applicationHome = + globalOptions.get(InternalKeys.INTERNAL_APPLICATION_HOME); + String imagePath = + globalOptions.get(InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH); + return buildPathToHomeDir(imagePath, applicationHome, bindir, script); + } + + public String buildPathToHomeDir(String imagePath, + String applicationHome, + String bindir, String script) throws + FileNotFoundException { + String path; + File scriptFile; + if (imagePath != null) { + File tarball = new File(SliderKeys.LOCAL_TARBALL_INSTALL_SUBDIR); + scriptFile = findBinScriptInExpandedArchive(tarball, bindir, script); + // now work back from the script to build the relative path + // to the binary which will be valid remote or local + StringBuilder builder = new StringBuilder(); + builder.append(SliderKeys.LOCAL_TARBALL_INSTALL_SUBDIR); + builder.append("/"); + //for the script, we want the name of ../.. + File archive = scriptFile.getParentFile().getParentFile(); + builder.append(archive.getName()); + path = builder.toString(); + + } else { + // using a home directory which is required to be present on + // the local system -so will be absolute and resolvable + File homedir = new File(applicationHome); + path = homedir.getAbsolutePath(); + + //this is absolute, resolve its entire path + SliderUtils.verifyIsDir(homedir, log); + File bin = new File(homedir, bindir); + SliderUtils.verifyIsDir(bin, log); + scriptFile = new File(bin, script); + SliderUtils.verifyFileExists(scriptFile, log); + } + return path; + } + + + /** + * Build the image dir. This path is relative and only valid at the far end + * @param instance instance options + * @param bindir bin subdir + * @param script script in bin subdir + * @return the path to the script + * @throws FileNotFoundException if a file is not found, or it is not a directory* + */ + public String buildPathToScript(AggregateConf instance, + String bindir, + String script) throws FileNotFoundException { + return buildPathToScript(instance.getInternalOperations(), bindir, script); + } + /** + * Build the image dir. This path is relative and only valid at the far end + * @param internal internal options + * @param bindir bin subdir + * @param script script in bin subdir + * @return the path to the script + * @throws FileNotFoundException if a file is not found, or it is not a directory* + */ + public String buildPathToScript(ConfTreeOperations internal, + String bindir, + String script) throws FileNotFoundException { + + String homedir = buildPathToHomeDir( + internal.get(InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH), + internal.get(InternalKeys.INTERNAL_APPLICATION_HOME), + bindir, + script); + return buildScriptPath(bindir, script, homedir); + } + + + + public String buildScriptPath(String bindir, String script, String homedir) { + StringBuilder builder = new StringBuilder(homedir); + builder.append("/"); + builder.append(bindir); + builder.append("/"); + builder.append(script); + return builder.toString(); + } + + + public static String convertToAppRelativePath(File file) { + return convertToAppRelativePath(file.getPath()); + } + + public static String convertToAppRelativePath(String path) { + return ApplicationConstants.Environment.PWD.$() + "/" + path; + } + + + public static void validatePathReferencesLocalDir(String meaning, String path) + throws BadConfigException { + File file = new File(path); + if (!file.exists()) { + throw new BadConfigException("%s directory %s not found", meaning, file); + } + if (!file.isDirectory()) { + throw new BadConfigException("%s is not a directory: %s", meaning, file); + } + } + + /** + * get the user name + * @return the user name + */ + public String getUserName() throws IOException { + return UserGroupInformation.getCurrentUser().getShortUserName(); + } + + /** + * Find a script in an expanded archive + * @param base base directory + * @param bindir bin subdir + * @param script script in bin subdir + * @return the path to the script + * @throws FileNotFoundException if a file is not found, or it is not a directory + */ + public File findBinScriptInExpandedArchive(File base, + String bindir, + String script) + throws FileNotFoundException { + + SliderUtils.verifyIsDir(base, log); + File[] ls = base.listFiles(); + if (ls == null) { + //here for the IDE to be happy, as the previous check will pick this case + throw new FileNotFoundException("Failed to list directory " + base); + } + + log.debug("Found {} entries in {}", ls.length, base); + List<File> directories = new LinkedList<File>(); + StringBuilder dirs = new StringBuilder(); + for (File file : ls) { + log.debug("{}", false); + if (file.isDirectory()) { + directories.add(file); + dirs.append(file.getPath()).append(" "); + } + } + if (directories.size() > 1) { + throw new FileNotFoundException( + "Too many directories in archive to identify binary: " + dirs); + } + if (directories.isEmpty()) { + throw new FileNotFoundException( + "No directory found in archive " + base); + } + File archive = directories.get(0); + File bin = new File(archive, bindir); + SliderUtils.verifyIsDir(bin, log); + File scriptFile = new File(bin, script); + SliderUtils.verifyFileExists(scriptFile, log); + return scriptFile; + } + + /** + * Return any additional arguments (argv) to provide when starting this role + * + * @param roleOptions + * The options for this role + * @return A non-null String which contains command line arguments for this role, or the empty string. + */ + public static String getAdditionalArgs(Map<String,String> roleOptions) { + if (roleOptions.containsKey(RoleKeys.ROLE_ADDITIONAL_ARGS)) { + String additionalArgs = roleOptions.get(RoleKeys.ROLE_ADDITIONAL_ARGS); + if (null != additionalArgs) { + return additionalArgs; + } + } + + return ""; + } + + public int getRoleResourceRequirement(String val, + int defVal, + int maxVal) { + if (val==null) { + val = Integer.toString(defVal); + } + Integer intVal; + if (ResourceKeys.YARN_RESOURCE_MAX.equals(val)) { + intVal = maxVal; + } else { + intVal = Integer.decode(val); + } + return intVal; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/SliderProviderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/SliderProviderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/SliderProviderFactory.java new file mode 100644 index 0000000..5dd4a32 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/SliderProviderFactory.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.providers; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.slider.common.SliderKeys; +import org.apache.slider.common.SliderXmlConfKeys; +import org.apache.slider.core.exceptions.BadClusterStateException; +import org.apache.slider.core.exceptions.SliderException; +import org.apache.slider.providers.agent.AgentKeys; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Base class for factories + */ +public abstract class SliderProviderFactory extends Configured { + + public static final String DEFAULT_CLUSTER_TYPE = AgentKeys.PROVIDER_AGENT; + + protected static final Logger log = + LoggerFactory.getLogger(SliderProviderFactory.class); + public static final String PROVIDER_NOT_FOUND = + "Unable to find provider of application type %s"; + + public SliderProviderFactory(Configuration conf) { + super(conf); + } + + protected SliderProviderFactory() { + } + + public abstract AbstractClientProvider createClientProvider(); + + public abstract ProviderService createServerProvider(); + + /** + * Create a provider for a specific application + * @param application app + * @return app instance + * @throws SliderException on any instantiation problem + */ + public static SliderProviderFactory createSliderProviderFactory(String application) throws + SliderException { + Configuration conf = loadSliderConfiguration(); + if (application == null) { + application = DEFAULT_CLUSTER_TYPE; + } + String providerKey = + String.format(SliderXmlConfKeys.KEY_PROVIDER, application); + if (application.contains(".")) { + log.debug("Treating {} as a classname", application); + String name = "classname.key"; + conf.set(name, application); + providerKey = name; + } + + Class<? extends SliderProviderFactory> providerClass; + try { + providerClass = conf.getClass(providerKey, null, SliderProviderFactory.class); + } catch (RuntimeException e) { + throw new BadClusterStateException(e, "Failed to load provider %s: %s", application, e); + } + if (providerClass == null) { + throw new BadClusterStateException(PROVIDER_NOT_FOUND, application); + } + + Exception ex; + try { + SliderProviderFactory providerFactory = providerClass.newInstance(); + providerFactory.setConf(conf); + return providerFactory; + } catch (Exception e) { + ex = e; + } + //by here the operation failed and ex is set to the value + throw new BadClusterStateException(ex, + "Failed to create an instance of %s : %s", + providerClass, + ex); + } + + /** + * Load a configuration with the {@link SliderKeys#SLIDER_XML} resource + * included + * @return a configuration instance + */ + public static Configuration loadSliderConfiguration() { + Configuration conf = new Configuration(); + conf.addResource(SliderKeys.SLIDER_XML); + return conf; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentClientProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentClientProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentClientProvider.java new file mode 100644 index 0000000..4c6a50b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentClientProvider.java @@ -0,0 +1,701 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.providers.agent; + +import com.google.common.io.Files; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.slider.api.InternalKeys; +import org.apache.slider.api.ResourceKeys; +import org.apache.slider.client.ClientUtils; +import org.apache.slider.common.SliderKeys; +import org.apache.slider.common.tools.SliderFileSystem; +import org.apache.slider.common.tools.SliderUtils; +import org.apache.slider.core.conf.AggregateConf; +import org.apache.slider.core.conf.ConfTreeOperations; +import org.apache.slider.core.conf.MapOperations; +import org.apache.slider.core.exceptions.BadConfigException; +import org.apache.slider.core.exceptions.SliderException; +import org.apache.slider.core.launch.AbstractLauncher; +import org.apache.slider.core.registry.docstore.PublishedConfiguration; +import org.apache.slider.providers.AbstractClientProvider; +import org.apache.slider.providers.ProviderRole; +import org.apache.slider.providers.ProviderUtils; +import org.apache.slider.providers.agent.application.metadata.Application; +import org.apache.slider.providers.agent.application.metadata.Component; +import org.apache.slider.providers.agent.application.metadata.ConfigFile; +import org.apache.slider.providers.agent.application.metadata.Metainfo; +import org.apache.slider.providers.agent.application.metadata.MetainfoParser; +import org.apache.slider.providers.agent.application.metadata.OSPackage; +import org.apache.slider.providers.agent.application.metadata.OSSpecific; +import org.apache.slider.providers.agent.application.metadata.Package; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +/** This class implements the client-side aspects of the agent deployer */ +public class AgentClientProvider extends AbstractClientProvider + implements AgentKeys, SliderKeys { + + + protected static final Logger log = + LoggerFactory.getLogger(AgentClientProvider.class); + protected static final String NAME = "agent"; + private static final ProviderUtils providerUtils = new ProviderUtils(log); + public static final String E_COULD_NOT_READ_METAINFO + = "Not a valid app package. Could not read metainfo."; + + protected Map<String, Metainfo> metaInfoMap = new ConcurrentHashMap<String, Metainfo>(); + + protected AgentClientProvider(Configuration conf) { + super(conf); + } + + @Override + public String getName() { + return NAME; + } + + @Override + public List<ProviderRole> getRoles() { + return AgentRoles.getRoles(); + } + + @Override //Client + public void preflightValidateClusterConfiguration(SliderFileSystem sliderFileSystem, + String clustername, + Configuration configuration, + AggregateConf instanceDefinition, + Path clusterDirPath, + Path generatedConfDirPath, + boolean secure) throws + SliderException, + IOException { + super.preflightValidateClusterConfiguration(sliderFileSystem, clustername, + configuration, + instanceDefinition, + clusterDirPath, + generatedConfDirPath, secure); + + String appDef = SliderUtils.getApplicationDefinitionPath(instanceDefinition + .getAppConfOperations()); + Path appDefPath = new Path(appDef); + sliderFileSystem.verifyFileExists(appDefPath); + + String agentConf = instanceDefinition.getAppConfOperations(). + getGlobalOptions().getOption(AgentKeys.AGENT_CONF, ""); + if (StringUtils.isNotEmpty(agentConf)) { + sliderFileSystem.verifyFileExists(new Path(agentConf)); + } + + String appHome = instanceDefinition.getAppConfOperations(). + getGlobalOptions().get(AgentKeys.PACKAGE_PATH); + if (SliderUtils.isUnset(appHome)) { + String agentImage = instanceDefinition.getInternalOperations(). + get(InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH); + sliderFileSystem.verifyFileExists(new Path(agentImage)); + } + } + + @Override + public void validateInstanceDefinition(AggregateConf instanceDefinition, SliderFileSystem fs) throws + SliderException { + super.validateInstanceDefinition(instanceDefinition, fs); + log.debug("Validating conf {}", instanceDefinition); + ConfTreeOperations resources = + instanceDefinition.getResourceOperations(); + + providerUtils.validateNodeCount(instanceDefinition, ROLE_NODE, + 0, -1); + + String appDef = null; + try { + // Validate the app definition + appDef = SliderUtils.getApplicationDefinitionPath(instanceDefinition + .getAppConfOperations()); + } catch (BadConfigException bce) { + throw new BadConfigException("Application definition must be provided. " + bce.getMessage()); + } + + log.info("Validating app definition {}", appDef); + String extension = appDef.substring(appDef.lastIndexOf(".") + 1, appDef.length()); + if (!"zip".equals(extension.toLowerCase(Locale.ENGLISH))) { + throw new BadConfigException("App definition must be packaged as a .zip file. File provided is " + appDef); + } + + Set<String> names = resources.getComponentNames(); + names.remove(SliderKeys.COMPONENT_AM); + Map<Integer, String> priorityMap = new HashMap<Integer, String>(); + + Metainfo metaInfo = getMetainfo(fs, appDef); + + for (String name : names) { + MapOperations component = resources.getMandatoryComponent(name); + + if (metaInfo != null) { + Component componentDef = metaInfo.getApplicationComponent(name); + if (componentDef == null) { + throw new BadConfigException( + "Component %s is not a member of application.", name); + } + } + + int priority = + component.getMandatoryOptionInt(ResourceKeys.COMPONENT_PRIORITY); + if (priority <= 0) { + throw new BadConfigException("Component %s %s value out of range %d", + name, + ResourceKeys.COMPONENT_PRIORITY, + priority); + } + + String existing = priorityMap.get(priority); + if (existing != null) { + throw new BadConfigException( + "Component %s has a %s value %d which duplicates that of %s", + name, + ResourceKeys.COMPONENT_PRIORITY, + priority, + existing); + } + priorityMap.put(priority, name); + } + + // fileSystem may be null for tests + if (metaInfo != null) { + for (String name : names) { + Component componentDef = metaInfo.getApplicationComponent(name); + if (componentDef == null) { + throw new BadConfigException( + "Component %s is not a member of application.", name); + } + + // ensure that intance count is 0 for client components + if ("CLIENT".equals(componentDef.getCategory())) { + MapOperations componentConfig = resources.getMandatoryComponent(name); + int count = + componentConfig.getMandatoryOptionInt(ResourceKeys.COMPONENT_INSTANCES); + if (count > 0) { + throw new BadConfigException("Component %s is of type CLIENT and cannot be instantiated." + + " Use \"slider client install ...\" command instead.", + name); + } + } else { + MapOperations componentConfig = resources.getMandatoryComponent(name); + int count = + componentConfig.getMandatoryOptionInt(ResourceKeys.COMPONENT_INSTANCES); + int definedMinCount = componentDef.getMinInstanceCountInt(); + int definedMaxCount = componentDef.getMaxInstanceCountInt(); + if (count < definedMinCount || count > definedMaxCount) { + throw new BadConfigException("Component %s, %s value %d out of range. " + + "Expected minimum is %d and maximum is %d", + name, + ResourceKeys.COMPONENT_INSTANCES, + count, + definedMinCount, + definedMaxCount); + } + } + } + } + } + + + @Override + public void prepareAMAndConfigForLaunch(SliderFileSystem fileSystem, + Configuration serviceConf, + AbstractLauncher launcher, + AggregateConf instanceDefinition, + Path snapshotConfDirPath, + Path generatedConfDirPath, + Configuration clientConfExtras, + String libdir, + Path tempPath, + boolean miniClusterTestRun) throws + IOException, + SliderException { + String agentImage = instanceDefinition.getInternalOperations(). + get(InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH); + if (SliderUtils.isUnset(agentImage)) { + Path agentPath = new Path(tempPath.getParent(), AgentKeys.PROVIDER_AGENT); + log.info("Automatically uploading the agent tarball at {}", agentPath); + fileSystem.getFileSystem().mkdirs(agentPath); + if (ProviderUtils.addAgentTar(this, AGENT_TAR, fileSystem, agentPath)) { + instanceDefinition.getInternalOperations().set( + InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH, + new Path(agentPath, AGENT_TAR).toUri()); + } + } + } + + @Override + public Set<String> getApplicationTags(SliderFileSystem fileSystem, + String appDef) throws SliderException { + Set<String> tags; + Metainfo metaInfo = getMetainfo(fileSystem, appDef); + + if (metaInfo == null) { + log.error("Error retrieving metainfo from {}", appDef); + throw new SliderException("Error parsing metainfo file, possibly bad structure."); + } + + Application application = metaInfo.getApplication(); + tags = new HashSet<String>(); + tags.add("Name: " + application.getName()); + tags.add("Version: " + application.getVersion()); + tags.add("Description: " + SliderUtils.truncate(application.getComment(), 80)); + + return tags; + } + + @Override + public void processClientOperation(SliderFileSystem fileSystem, + RegistryOperations rops, + Configuration configuration, + String operation, + File clientInstallPath, + File appPackage, + JSONObject config, + String name) throws SliderException { + // create temp folder + // create sub-folders app_pkg, agent_pkg, command + File tmpDir = Files.createTempDir(); + log.info("Command is being executed at {}", tmpDir.getAbsolutePath()); + File appPkgDir = new File(tmpDir, "app_pkg"); + appPkgDir.mkdir(); + + File agentPkgDir = new File(tmpDir, "agent_pkg"); + agentPkgDir.mkdir(); + + File cmdDir = new File(tmpDir, "command"); + cmdDir.mkdir(); + + Metainfo metaInfo = null; + JSONObject defaultConfig = null; + try { + // expand app package into /app_pkg + ZipInputStream zipInputStream = null; + try { + zipInputStream = new ZipInputStream(new FileInputStream(appPackage)); + { + ZipEntry zipEntry = zipInputStream.getNextEntry(); + while (zipEntry != null) { + log.info("Processing {}", zipEntry.getName()); + String filePath = appPkgDir + File.separator + zipEntry.getName(); + if (!zipEntry.isDirectory()) { + log.info("Extracting file {}", filePath); + extractFile(zipInputStream, filePath); + + if ("metainfo.xml".equals(zipEntry.getName())) { + FileInputStream input = null; + try { + input = new FileInputStream(filePath); + metaInfo = new MetainfoParser().fromXmlStream(input); + } finally { + IOUtils.closeStream(input); + } + } else if ("metainfo.json".equals(zipEntry.getName())) { + FileInputStream input = null; + try { + input = new FileInputStream(filePath); + metaInfo = new MetainfoParser().fromJsonStream(input); + } finally { + IOUtils.closeStream(input); + } + } else if ("clientInstallConfig-default.json".equals(zipEntry.getName())) { + try { + defaultConfig = new JSONObject(FileUtils.readFileToString(new File(filePath), Charset.defaultCharset())); + } catch (JSONException jex) { + throw new SliderException("Unable to read default client config.", jex); + } + } + } else { + log.info("Creating dir {}", filePath); + File dir = new File(filePath); + dir.mkdir(); + } + zipInputStream.closeEntry(); + zipEntry = zipInputStream.getNextEntry(); + } + } + } finally { + zipInputStream.close(); + } + + if (metaInfo == null) { + throw new BadConfigException(E_COULD_NOT_READ_METAINFO); + } + + String clientScript = null; + String clientComponent = null; + for (Component component : metaInfo.getApplication().getComponents()) { + if (component.getCategory().equals("CLIENT")) { + clientComponent = component.getName(); + if (component.getCommandScript() != null) { + clientScript = component.getCommandScript().getScript(); + } + break; + } + } + + if (SliderUtils.isUnset(clientScript)) { + log.info("Installing CLIENT without script"); + List<Package> packages = metaInfo.getApplication().getPackages(); + if (packages.size() > 0) { + // retrieve package resources from HDFS and extract + for (Package pkg : packages) { + Path pkgPath = fileSystem.buildResourcePath(pkg.getName()); + if (!fileSystem.isFile(pkgPath) && name != null) { + pkgPath = fileSystem.buildResourcePath(name, pkg.getName()); + } + if (!fileSystem.isFile(pkgPath)) { + throw new IOException("Package doesn't exist as a resource: " + + pkg.getName()); + } + if ("archive".equals(pkg.getType())) { + File pkgFile = new File(tmpDir, pkg.getName()); + fileSystem.copyHdfsFileToLocal(pkgPath, pkgFile); + expandTar(pkgFile, clientInstallPath); + } else { + File pkgFile = new File(clientInstallPath, pkg.getName()); + fileSystem.copyHdfsFileToLocal(pkgPath, pkgFile); + } + } + } else { + // extract tarball from app def + for (OSSpecific osSpecific : metaInfo.getApplication() + .getOSSpecifics()) { + for (OSPackage pkg : osSpecific.getPackages()) { + if ("tarball".equals(pkg.getType())) { + File pkgFile = new File(appPkgDir, pkg.getName()); + expandTar(pkgFile, clientInstallPath); + } + } + } + } + if (name == null) { + log.warn("Conf files not being generated because no app name was " + + "provided"); + return; + } + File confInstallDir; + String clientRoot = null; + if (config != null) { + try { + clientRoot = config.getJSONObject("global") + .getString(AgentKeys.APP_CLIENT_ROOT); + } catch (JSONException e) { + log.info("Couldn't read {} from provided client config, falling " + + "back on default", AgentKeys.APP_CLIENT_ROOT); + } + } + if (clientRoot == null && defaultConfig != null) { + try { + clientRoot = defaultConfig.getJSONObject("global") + .getString(AgentKeys.APP_CLIENT_ROOT); + } catch (JSONException e) { + log.info("Couldn't read {} from default client config, using {}", + AgentKeys.APP_CLIENT_ROOT, clientInstallPath); + } + } + if (clientRoot == null) { + confInstallDir = clientInstallPath; + } else { + confInstallDir = new File(new File(clientInstallPath, clientRoot), "conf"); + if (!confInstallDir.exists()) { + confInstallDir.mkdirs(); + } + } + String user = RegistryUtils.currentUser(); + for (ConfigFile configFile : metaInfo.getComponentConfigFiles(clientComponent)) { + retrieveConfigFile(rops, configuration, configFile, name, user, + confInstallDir); + } + } else { + log.info("Installing CLIENT using script {}", clientScript); + expandAgentTar(agentPkgDir); + + JSONObject commandJson = getCommandJson(defaultConfig, config, metaInfo, clientInstallPath, name); + FileWriter file = new FileWriter(new File(cmdDir, "command.json")); + try { + file.write(commandJson.toString()); + + } catch (IOException e) { + log.error("Couldn't write command.json to file"); + } finally { + file.flush(); + file.close(); + } + + runCommand(appPkgDir, agentPkgDir, cmdDir, clientScript); + } + + } catch (IOException ioex) { + log.warn("Error while executing INSTALL command {}", ioex.getMessage()); + throw new SliderException("INSTALL client failed."); + } + } + + protected void runCommand( + File appPkgDir, + File agentPkgDir, + File cmdDir, + String clientScript) throws SliderException { + int exitCode = 0; + Exception exp = null; + try { + String clientScriptPath = appPkgDir.getAbsolutePath() + File.separator + "package" + + File.separator + clientScript; + List<String> command = Arrays.asList(AgentKeys.PYTHON_EXE, + "-S", + clientScriptPath, + "INSTALL", + cmdDir.getAbsolutePath() + File.separator + "command.json", + appPkgDir.getAbsolutePath() + File.separator + "package", + cmdDir.getAbsolutePath() + File.separator + "command-out.json", + "DEBUG"); + ProcessBuilder pb = new ProcessBuilder(command); + log.info("Command: " + StringUtils.join(pb.command(), " ")); + pb.environment().put(SliderKeys.PYTHONPATH, + agentPkgDir.getAbsolutePath() + + File.separator + "slider-agent" + File.pathSeparator + + agentPkgDir.getAbsolutePath() + + File.separator + "slider-agent/jinja2"); + log.info("{}={}", SliderKeys.PYTHONPATH, pb.environment().get(SliderKeys.PYTHONPATH)); + + Process proc = pb.start(); + InputStream stderr = proc.getErrorStream(); + InputStream stdout = proc.getInputStream(); + BufferedReader stdOutReader = new BufferedReader(new InputStreamReader(stdout)); + BufferedReader stdErrReader = new BufferedReader(new InputStreamReader(stderr)); + + proc.waitFor(); + + String line; + while ((line = stdOutReader.readLine()) != null) { + log.info("Stdout: " + line); + } + while ((line = stdErrReader.readLine()) != null) { + log.info("Stderr: " + line); + } + + exitCode = proc.exitValue(); + log.info("Exit value is {}", exitCode); + } catch (IOException e) { + exp = e; + } catch (InterruptedException e) { + exp = e; + } + + if (exitCode != 0) { + throw new SliderException("INSTALL client failed with exit code " + exitCode); + } + + if (exp != null) { + log.error("Error while executing INSTALL command {}. Stack trace {}", + exp.getMessage(), + ExceptionUtils.getStackTrace(exp)); + throw new SliderException("INSTALL client failed.", exp); + } + } + + private void expandAgentTar(File agentPkgDir) throws IOException { + String libDirProp = + System.getProperty(SliderKeys.PROPERTY_LIB_DIR); + File tarFile = new File(libDirProp, SliderKeys.AGENT_TAR); + expandTar(tarFile, agentPkgDir); + } + + private void expandTar(File tarFile, File destDir) throws IOException { + log.info("Expanding tar {} to {}", tarFile, destDir); + TarArchiveInputStream tarIn = new TarArchiveInputStream( + new GzipCompressorInputStream( + new BufferedInputStream( + new FileInputStream(tarFile) + ) + ) + ); + try { + TarArchiveEntry tarEntry = tarIn.getNextTarEntry(); + while (tarEntry != null) { + File destPath = new File(destDir, tarEntry.getName()); + File parent = destPath.getParentFile(); + if (!parent.exists()) { + parent.mkdirs(); + } + if (tarEntry.isDirectory()) { + destPath.mkdirs(); + } else { + byte[] byteToRead = new byte[1024]; + BufferedOutputStream buffOut = + new BufferedOutputStream(new FileOutputStream(destPath)); + try { + int len; + while ((len = tarIn.read(byteToRead)) != -1) { + buffOut.write(byteToRead, 0, len); + } + } finally { + buffOut.close(); + } + } + if ((tarEntry.getMode() & 0100) != 0) { + destPath.setExecutable(true); + } + tarEntry = tarIn.getNextTarEntry(); + } + } finally { + tarIn.close(); + } + } + + private void retrieveConfigFile(RegistryOperations rops, + Configuration configuration, ConfigFile configFile, String name, + String user, File destDir) throws IOException, SliderException { + log.info("Retrieving config {} to {}", configFile.getDictionaryName(), + destDir); + PublishedConfiguration published = ClientUtils.getConfigFromRegistry(rops, + configuration, configFile.getDictionaryName(), name, user, true); + ClientUtils.saveOrReturnConfig(published, configFile.getType(), + destDir, configFile.getFileName()); + } + + protected JSONObject getCommandJson(JSONObject defaultConfig, + JSONObject inputConfig, + Metainfo metainfo, + File clientInstallPath, + String name) throws SliderException { + try { + JSONObject pkgList = new JSONObject(); + pkgList.put(AgentKeys.PACKAGE_LIST, + AgentProviderService.getPackageListFromApplication(metainfo.getApplication())); + JSONObject obj = new JSONObject(); + obj.put("hostLevelParams", pkgList); + + String user = RegistryUtils.currentUser(); + JSONObject configuration = new JSONObject(); + JSONObject global = new JSONObject(); + global.put("app_install_dir", clientInstallPath.getAbsolutePath()); + global.put("app_user", user); + if (name != null) { + global.put("app_name", name); + } + + if (defaultConfig != null) { + readConfigEntries(defaultConfig, clientInstallPath, global, name, user); + } + if (inputConfig != null) { + readConfigEntries(inputConfig, clientInstallPath, global, name, user); + } + + configuration.put("global", global); + obj.put("configurations", configuration); + return obj; + } catch (JSONException jex) { + log.warn("Error while executing INSTALL command {}", jex.getMessage()); + throw new SliderException("INSTALL client failed."); + } + } + + private void readConfigEntries(JSONObject inpConfig, + File clientInstallPath, + JSONObject globalConfig, + String name, String user) + throws JSONException { + JSONObject globalSection = inpConfig.getJSONObject("global"); + Iterator it = globalSection.keys(); + while (it.hasNext()) { + String key = (String) it.next(); + String value = globalSection.getString(key); + if (SliderUtils.isSet(value)) { + value = value.replace("{app_install_dir}", clientInstallPath.getAbsolutePath()); + value = value.replace("{app_user}", user); + if (name != null) { + value = value.replace("{app_name}", name); + } + } + if (globalConfig.has(key)) { + // last one wins + globalConfig.remove(key); + } + globalConfig.put(key, value); + } + } + + private void extractFile(ZipInputStream zipInputStream, String filePath) throws IOException { + BufferedOutputStream output = new BufferedOutputStream(new FileOutputStream(filePath)); + try { + byte[] bytesRead = new byte[4096]; + int read = 0; + while ((read = zipInputStream.read(bytesRead)) != -1) { + output.write(bytesRead, 0, read); + } + } finally { + output.close(); + } + } + + private Metainfo getMetainfo(SliderFileSystem fs, String appDef) { + Metainfo metaInfo = metaInfoMap.get(appDef); + if (fs != null && metaInfo == null) { + try { + metaInfo = AgentUtils.getApplicationMetainfo(fs, appDef, false); + metaInfoMap.put(appDef, metaInfo); + } catch (IOException ioe) { + // Ignore missing metainfo file for now + log.info("Missing metainfo {}", ioe.getMessage()); + } catch (BadConfigException bce) { + log.info("Bad Configuration {}", bce.getMessage()); + } + } + return metaInfo; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java new file mode 100644 index 0000000..01a3f1a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.providers.agent; + +/* + + */ +public interface AgentKeys { + + String PROVIDER_AGENT = "agent"; + String ROLE_NODE = "echo"; + + /** + * Template stored in the slider classpath -to use if there is + * no site-specific template + * {@value} + */ + String CONF_RESOURCE = "org/apache/slider/providers/agent/conf/"; + /* URL to talk back to Agent Controller*/ + String CONTROLLER_URL = "agent.controller.url"; + /** + * The location of pre-installed agent path. + * This can be also be dynamically computed based on Yarn installation of agent. + */ + String PACKAGE_PATH = "agent.package.root"; + /** + * The location of the script implementing the command. + */ + String SCRIPT_PATH = "agent.script"; + /** + * Execution home for the agent. + */ + String APP_HOME = "app.home"; + String APP_ROOT = "site.global.app_root"; + String APP_CLIENT_ROOT = "client_root"; + /** + * Runas user of the application + */ + String RUNAS_USER = "site.global.app_user"; + /** + * Name of the service. + */ + String SERVICE_NAME = "app.name"; + String ARG_LABEL = "--label"; + String ARG_HOST = "--host"; + String ARG_PORT = "--port"; + String ARG_SECURED_PORT = "--secured_port"; + String ARG_ZOOKEEPER_QUORUM = "--zk-quorum"; + String ARG_ZOOKEEPER_REGISTRY_PATH = "--zk-reg-path"; + String ARG_DEBUG = "--debug"; + String AGENT_MAIN_SCRIPT_ROOT = "./infra/agent/slider-agent/"; + String AGENT_JINJA2_ROOT = "./infra/agent/slider-agent/jinja2"; + String AGENT_MAIN_SCRIPT = "agent/main.py"; + + String APP_DEF = "application.def"; + String ADDON_PREFIX = "application.addon."; + String ADDONS = "application.addons"; + String AGENT_VERSION = "agent.version"; + String AGENT_CONF = "agent.conf"; + String ADDON_FOR_ALL_COMPONENTS = "ALL"; + + String APP_RESOURCES = "application.resources"; + String APP_RESOURCES_DIR = "app/resources"; + + String APP_CONF_DIR = "app/conf"; + + String AGENT_INSTALL_DIR = "infra/agent"; + String APP_DEFINITION_DIR = "app/definition"; + String ADDON_DEFINITION_DIR = "addon/definition"; + String AGENT_CONFIG_FILE = "infra/conf/agent.ini"; + String AGENT_VERSION_FILE = "infra/version"; + String APP_PACKAGES_DIR = "app/packages"; + String PER_COMPONENT = "per.component"; + String PER_GROUP = "per.group"; + + String JAVA_HOME = "java_home"; + String PACKAGE_LIST = "package_list"; + String SYSTEM_CONFIGS = "system_configs"; + String WAIT_HEARTBEAT = "wait.heartbeat"; + String PYTHON_EXE = "python"; + String CREATE_DEF_ZK_NODE = "create.default.zookeeper.node"; + String HEARTBEAT_MONITOR_INTERVAL = "heartbeat.monitor.interval"; + String AGENT_INSTANCE_DEBUG_DATA = "agent.instance.debug.data"; + String AGENT_OUT_FILE = "slider-agent.out"; + String KEY_AGENT_TWO_WAY_SSL_ENABLED = "ssl.server.client.auth"; + String INFRA_RUN_SECURITY_DIR = "infra/run/security/"; + String CERT_FILE_LOCALIZATION_PATH = INFRA_RUN_SECURITY_DIR + "ca.crt"; + String KEY_CONTAINER_LAUNCH_DELAY = "container.launch.delay.sec"; + String TEST_RELAX_VERIFICATION = "test.relax.validation"; + String AM_CONFIG_GENERATION = "am.config.generation"; +} + + http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentLaunchParameter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentLaunchParameter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentLaunchParameter.java new file mode 100644 index 0000000..18c6374 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentLaunchParameter.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.providers.agent; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Pattern; + +class AgentLaunchParameter { + public static final Logger log = + LoggerFactory.getLogger(AgentLaunchParameter.class); + private static final String DEFAULT_PARAMETER = ""; + private static final String ANY_COMPONENT = "ANY"; + private static final String NONE_VALUE = "NONE"; + private final Map<String, CommandTracker> launchParameterTracker; + + public AgentLaunchParameter(String parameters) { + launchParameterTracker = parseExpectedLaunchParameters(parameters); + } + + /** + * Get command for the component type + * + * @param componentGroup + * + * @return + */ + public String getNextLaunchParameter(String componentGroup) { + if (launchParameterTracker != null) { + if (launchParameterTracker.containsKey(componentGroup) + || launchParameterTracker.containsKey(ANY_COMPONENT)) { + synchronized (this) { + CommandTracker indexTracker = null; + if (launchParameterTracker.containsKey(componentGroup)) { + indexTracker = launchParameterTracker.get(componentGroup); + } else { + indexTracker = launchParameterTracker.get(ANY_COMPONENT); + } + + return indexTracker.getNextCommand(); + } + } + } + + return DEFAULT_PARAMETER; + } + + /** + * Parse launch parameters of the form ANY:PARAM_FOR_FIRST:PARAM_FOR_SECOND:...:PARAM_FOR_REST|HBASE_MASTER:... + * + * E.g. ANY:DO_NOT_REGISTER:DO_NOT_HEARTBEAT:NONE For any container, first one gets DO_NOT_REGISTER second one gets + * DO_NOT_HEARTBEAT, then all of the rest get nothing + * + * E.g. HBASE_MASTER:FAIL_AFTER_START:NONE For HBASE_MASTER, first one gets FAIL_AFTER_START then "" for all + * + * @param launchParameters + * + * @return + */ + Map<String, CommandTracker> parseExpectedLaunchParameters(String launchParameters) { + Map<String, CommandTracker> trackers = null; + if (launchParameters != null && launchParameters.length() > 0) { + String[] componentSpecificParameters = launchParameters.split(Pattern.quote("|")); + for (String componentSpecificParameter : componentSpecificParameters) { + if (componentSpecificParameter.length() != 0) { + String[] parameters = componentSpecificParameter.split(Pattern.quote(":")); + + if (parameters.length > 1 && parameters[0].length() > 0) { + + for (int index = 1; index < parameters.length; index++) { + if (parameters[index].equals(NONE_VALUE)) { + parameters[index] = DEFAULT_PARAMETER; + } + } + + if (trackers == null) { + trackers = new HashMap<String, CommandTracker>(10); + } + String componentName = parameters[0]; + CommandTracker tracker = new CommandTracker(Arrays.copyOfRange(parameters, 1, parameters.length)); + trackers.put(componentName, tracker); + } + } + } + } + + return trackers; + } + + class CommandTracker { + private final int maxIndex; + private final String[] launchCommands; + private int currentIndex; + + CommandTracker(String[] launchCommands) { + this.currentIndex = 0; + this.maxIndex = launchCommands.length - 1; + this.launchCommands = launchCommands; + } + + String getNextCommand() { + String retVal = launchCommands[currentIndex]; + if (currentIndex != maxIndex) { + currentIndex++; + } + + return retVal; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderFactory.java new file mode 100644 index 0000000..d5ca749 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.providers.agent; + +import org.apache.hadoop.conf.Configuration; +import org.apache.slider.providers.AbstractClientProvider; +import org.apache.slider.providers.ProviderService; +import org.apache.slider.providers.SliderProviderFactory; + +public class AgentProviderFactory extends SliderProviderFactory { + + public static final String CLASSNAME = + "org.apache.slider.providers.agent.AgentProviderFactory"; + + public AgentProviderFactory() { + } + + public AgentProviderFactory(Configuration conf) { + super(conf); + } + + @Override + public AbstractClientProvider createClientProvider() { + return new AgentClientProvider(getConf()); + } + + @Override + public ProviderService createServerProvider() { + return new AgentProviderService(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org