http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java new file mode 100644 index 0000000..647bfe9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.provider; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.slider.api.ClusterNode; +import org.apache.slider.api.ResourceKeys; +import org.apache.slider.api.RoleKeys; +import org.apache.slider.api.resource.Application; +import org.apache.slider.api.resource.Component; +import org.apache.slider.api.resource.ConfigFile; +import org.apache.slider.api.resource.Configuration; +import org.apache.hadoop.yarn.service.conf.SliderKeys; +import org.apache.hadoop.yarn.service.conf.SliderXmlConfKeys; +import org.apache.slider.common.tools.SliderFileSystem; +import org.apache.slider.common.tools.SliderUtils; +import org.apache.slider.core.exceptions.BadCommandArgumentsException; +import org.apache.slider.core.exceptions.SliderException; +import org.apache.slider.core.launch.AbstractLauncher; +import org.apache.slider.core.launch.ContainerLauncher; +import org.apache.slider.core.registry.docstore.ConfigFormat; +import org.apache.slider.core.registry.docstore.PublishedConfiguration; +import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter; +import org.apache.hadoop.yarn.service.compinstance.ComponentInstance; +import org.apache.hadoop.yarn.service.ServiceContext; +import org.apache.slider.server.appmaster.state.StateAccessForProviders; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.regex.Pattern; + +import static org.apache.slider.api.ServiceApiConstants.*; +import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.$; + +/** + * This is a factoring out of methods handy for providers. It's bonded to a log + * at construction time. + */ +public class ProviderUtils implements RoleKeys, SliderKeys { + + protected static final Logger log = + LoggerFactory.getLogger(ProviderUtils.class); + + + /** + * Add oneself to the classpath. This does not work + * on minicluster test runs where the JAR is not built up. + * @param providerResources map of provider resources to add these entries to + * @param providerClass provider to add + * @param jarName name of the jar to use + * @param sliderFileSystem target filesystem + * @param tempPath path in the cluster FS for temp files + * @param libdir relative directory to place resources + * @param miniClusterTestRun true if minicluster is being used + * @return true if the class was found in a JAR + * + * @throws FileNotFoundException if the JAR was not found and this is NOT + * a mini cluster test run + * @throws IOException IO problems + * @throws SliderException any Slider problem + */ + public static boolean addProviderJar( + Map<String, LocalResource> providerResources, + Class providerClass, + String jarName, + SliderFileSystem sliderFileSystem, + Path tempPath, + String libdir, + boolean miniClusterTestRun) throws + IOException, + SliderException { + try { + SliderUtils.putJar(providerResources, + sliderFileSystem, + providerClass, + tempPath, + libdir, + jarName); + return true; + } catch (FileNotFoundException e) { + if (miniClusterTestRun) { + return false; + } else { + throw e; + } + } + } + + /** + * Loads all dependency jars from the default path. + * @param providerResources map of provider resources to add these entries to + * @param sliderFileSystem target filesystem + * @param tempPath path in the cluster FS for temp files + * @param libDir relative directory to place resources + * @param libLocalSrcDir explicitly supplied local libs dir + * @throws IOException trouble copying to HDFS + * @throws SliderException trouble copying to HDFS + */ + public static void addAllDependencyJars( + Map<String, LocalResource> providerResources, + SliderFileSystem sliderFileSystem, + Path tempPath, + String libDir, + String libLocalSrcDir) + throws IOException, SliderException { + if (SliderUtils.isSet(libLocalSrcDir)) { + File file = new File(libLocalSrcDir); + if (!file.exists() || !file.isDirectory()) { + throw new BadCommandArgumentsException( + "Supplied lib src dir %s is not valid", libLocalSrcDir); + } + } + SliderUtils.putAllJars(providerResources, sliderFileSystem, tempPath, + libDir, libLocalSrcDir); + } + + public static String substituteStrWithTokens(String content, + Map<String, String> tokensForSubstitution) { + for (Map.Entry<String, String> token : tokensForSubstitution.entrySet()) { + content = + content.replaceAll(Pattern.quote(token.getKey()), token.getValue()); + } + return content; + } + + // configs will be substituted by corresponding env in tokenMap + public static void substituteMapWithTokens(Map<String, String> configs, + Map<String, String> tokenMap) { + for (Map.Entry<String, String> entry : configs.entrySet()) { + String value = entry.getValue(); + if (tokenMap != null) { + for (Map.Entry<String, String> token : tokenMap.entrySet()) { + value = + value.replaceAll(Pattern.quote(token.getKey()), token.getValue()); + } + } + entry.setValue(value); + } + } + + /** + * Get resource requirements from a String value. If value isn't specified, + * use the default value. If value is greater than max, use the max value. + * @param val string value + * @param defVal default value + * @param maxVal maximum value + * @return int resource requirement + */ + public int getRoleResourceRequirement(String val, + int defVal, + int maxVal) { + if (val==null) { + val = Integer.toString(defVal); + } + Integer intVal; + if (ResourceKeys.YARN_RESOURCE_MAX.equals(val)) { + intVal = maxVal; + } else { + intVal = Integer.decode(val); + } + return intVal; + } + + + /** + * Localize the service keytabs for the application. + * @param launcher container launcher + * @param fileSystem file system + * @throws IOException trouble uploading to HDFS + */ + public void localizeServiceKeytabs(ContainerLauncher launcher, + SliderFileSystem fileSystem, Application application) throws IOException { + + Configuration conf = application.getConfiguration(); + String keytabPathOnHost = + conf.getProperty(SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH); + if (SliderUtils.isUnset(keytabPathOnHost)) { + String amKeytabName = + conf.getProperty(SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME); + String keytabDir = + conf.getProperty(SliderXmlConfKeys.KEY_HDFS_KEYTAB_DIR); + // we need to localize the keytab files in the directory + Path keytabDirPath = fileSystem.buildKeytabPath(keytabDir, null, + application.getName()); + boolean serviceKeytabsDeployed = false; + if (fileSystem.getFileSystem().exists(keytabDirPath)) { + FileStatus[] keytabs = fileSystem.getFileSystem().listStatus( + keytabDirPath); + LocalResource keytabRes; + for (FileStatus keytab : keytabs) { + if (!amKeytabName.equals(keytab.getPath().getName()) + && keytab.getPath().getName().endsWith(".keytab")) { + serviceKeytabsDeployed = true; + log.info("Localizing keytab {}", keytab.getPath().getName()); + keytabRes = fileSystem.createAmResource(keytab.getPath(), + LocalResourceType.FILE); + launcher.addLocalResource(KEYTAB_DIR + "/" + + keytab.getPath().getName(), + keytabRes); + } + } + } + if (!serviceKeytabsDeployed) { + log.warn("No service keytabs for the application have been localized. " + + "If the application requires keytabs for secure operation, " + + "please ensure that the required keytabs have been uploaded " + + "to the folder {}", keytabDirPath); + } + } + } + + // 1. Create all config files for a component on hdfs for localization + // 2. Add the config file to localResource + public static synchronized void createConfigFileAndAddLocalResource( + AbstractLauncher launcher, SliderFileSystem fs, Component component, + Map<String, String> tokensForSubstitution, ComponentInstance instance, + ServiceContext context) throws IOException { + Path compDir = + new Path(new Path(fs.getAppDir(), "components"), component.getName()); + Path compInstanceDir = + new Path(compDir, instance.getCompInstanceName()); + if (!fs.getFileSystem().exists(compInstanceDir)) { + log.info(instance.getCompInstanceId() + ": Creating dir on hdfs: " + compInstanceDir); + fs.getFileSystem().mkdirs(compInstanceDir, + new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE)); + instance.setCompInstanceDir(compInstanceDir); + } else { + log.info("Component instance conf dir already exists: " + compInstanceDir); + } + + if (log.isDebugEnabled()) { + log.debug("Tokens substitution for component instance: " + instance + .getCompInstanceName() + System.lineSeparator() + + tokensForSubstitution); + } + + for (ConfigFile originalFile : component.getConfiguration().getFiles()) { + ConfigFile configFile = originalFile.copy(); + String fileName = new Path(configFile.getDestFile()).getName(); + + // substitute file name + for (Map.Entry<String, String> token : tokensForSubstitution.entrySet()) { + configFile.setDestFile(configFile.getDestFile() + .replaceAll(Pattern.quote(token.getKey()), token.getValue())); + } + + Path remoteFile = new Path(compInstanceDir, fileName); + if (!fs.getFileSystem().exists(remoteFile)) { + log.info("Saving config file on hdfs for component " + instance + .getCompInstanceName() + ": " + configFile); + + if (configFile.getSrcFile() != null) { + // Load config file template + switch (configFile.getType()) { + case HADOOP_XML: + // Hadoop_xml_template + resolveHadoopXmlTemplateAndSaveOnHdfs(fs.getFileSystem(), + tokensForSubstitution, configFile, remoteFile, context); + break; + case TEMPLATE: + // plain-template + resolvePlainTemplateAndSaveOnHdfs(fs.getFileSystem(), + tokensForSubstitution, configFile, remoteFile, context); + break; + default: + log.info("Not supporting loading src_file for " + configFile); + break; + } + } else { + // non-template + resolveNonTemplateConfigsAndSaveOnHdfs(fs, tokensForSubstitution, + instance, configFile, fileName, remoteFile); + } + } + + // Add resource for localization + LocalResource configResource = + fs.createAmResource(remoteFile, LocalResourceType.FILE); + File destFile = new File(configFile.getDestFile()); + String symlink = APP_CONF_DIR + "/" + fileName; + if (destFile.isAbsolute()) { + launcher.addLocalResource(symlink, configResource, + configFile.getDestFile()); + log.info("Add config file for localization: " + symlink + " -> " + + configResource.getResource().getFile() + ", dest mount path: " + + configFile.getDestFile()); + } else { + launcher.addLocalResource(symlink, configResource); + log.info("Add config file for localization: " + symlink + " -> " + + configResource.getResource().getFile()); + } + } + } + + private static void resolveNonTemplateConfigsAndSaveOnHdfs(SliderFileSystem fs, + Map<String, String> tokensForSubstitution, ComponentInstance instance, + ConfigFile configFile, String fileName, Path remoteFile) + throws IOException { + // substitute non-template configs + substituteMapWithTokens(configFile.getProps(), tokensForSubstitution); + + // write configs onto hdfs + PublishedConfiguration publishedConfiguration = + new PublishedConfiguration(fileName, + configFile.getProps().entrySet()); + if (!fs.getFileSystem().exists(remoteFile)) { + PublishedConfigurationOutputter configurationOutputter = + PublishedConfigurationOutputter.createOutputter( + ConfigFormat.resolve(configFile.getType().toString()), + publishedConfiguration); + try (FSDataOutputStream os = fs.getFileSystem().create(remoteFile)) { + configurationOutputter.save(os); + os.flush(); + } + } else { + log.info("Component instance = " + instance.getCompInstanceName() + + ", config file already exists: " + remoteFile); + } + } + + // 1. substitute config template - only handle hadoop_xml format + // 2. save on hdfs + @SuppressWarnings("unchecked") + private static void resolveHadoopXmlTemplateAndSaveOnHdfs(FileSystem fs, + Map<String, String> tokensForSubstitution, ConfigFile configFile, + Path remoteFile, ServiceContext context) throws IOException { + Map<String, String> conf; + try { + conf = (Map<String, String>) context.configCache.get(configFile); + } catch (ExecutionException e) { + log.info("Failed to load config file: " + configFile, e); + return; + } + // make a copy for substitution + org.apache.hadoop.conf.Configuration confCopy = + new org.apache.hadoop.conf.Configuration(false); + for (Map.Entry<String, String> entry : conf.entrySet()) { + confCopy.set(entry.getKey(), entry.getValue()); + } + // substitute properties + for (Map.Entry<String, String> entry : configFile.getProps().entrySet()) { + confCopy.set(entry.getKey(), entry.getValue()); + } + // substitute env variables + for (Map.Entry<String, String> entry : confCopy) { + String val = entry.getValue(); + if (val != null) { + for (Map.Entry<String, String> token : tokensForSubstitution + .entrySet()) { + val = val.replaceAll(Pattern.quote(token.getKey()), token.getValue()); + confCopy.set(entry.getKey(), val); + } + } + } + // save on hdfs + try (OutputStream output = fs.create(remoteFile)) { + confCopy.writeXml(output); + log.info("Reading config from: " + configFile.getSrcFile() + + ", writing to: " + remoteFile); + } + } + + // 1) read the template as a string + // 2) do token substitution + // 3) save on hdfs + private static void resolvePlainTemplateAndSaveOnHdfs(FileSystem fs, + Map<String, String> tokensForSubstitution, ConfigFile configFile, + Path remoteFile, ServiceContext context) { + String content; + try { + content = (String) context.configCache.get(configFile); + } catch (ExecutionException e) { + log.info("Failed to load config file: " + configFile, e); + return; + } + // substitute tokens + content = substituteStrWithTokens(content, tokensForSubstitution); + + try (OutputStream output = fs.create(remoteFile)) { + org.apache.commons.io.IOUtils.write(content, output); + } catch (IOException e) { + log.info("Failed to create " + remoteFile); + } + } + + /** + * Get initial component token map to be substituted into config values. + * @return tokens to replace + */ + public static Map<String, String> initCompTokensForSubstitute( + ComponentInstance instance) { + Map<String, String> tokens = new HashMap<>(); + tokens.put(COMPONENT_NAME, instance.getCompSpec().getName()); + tokens + .put(COMPONENT_NAME_LC, instance.getCompSpec().getName().toLowerCase()); + tokens.put(COMPONENT_INSTANCE_NAME, instance.getCompInstanceName()); + tokens.put(CONTAINER_ID, instance.getContainer().getId().toString()); + tokens.put(COMPONENT_ID, + String.valueOf(instance.getCompInstanceId().getId())); + return tokens; + } + + /** + * Add ROLE_HOST tokens for substitution into config values. + * @param tokens existing tokens + * @param amState access to AM state + */ + public static void addComponentHostTokens(Map<String, String> tokens, + StateAccessForProviders amState) { + if (amState == null) { + return; + } + for (Map.Entry<String, Map<String, ClusterNode>> entry : + amState.getRoleClusterNodeMapping().entrySet()) { + String tokenName = entry.getKey().toUpperCase(Locale.ENGLISH) + "_HOST"; + String hosts = StringUtils .join(",", + getHostsList(entry.getValue().values(), true)); + tokens.put($(tokenName), hosts); + } + } + + /** + * Return a list of hosts based on current ClusterNodes. + * @param values cluster nodes + * @param hostOnly whether host or host/server name will be added to list + * @return list of hosts + */ + public static Iterable<String> getHostsList(Collection<ClusterNode> values, + boolean hostOnly) { + List<String> hosts = new ArrayList<>(); + for (ClusterNode cn : values) { + hosts.add(hostOnly ? cn.host : cn.host + "/" + cn.name); + } + return hosts; + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/defaultImpl/DefaultClientProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/defaultImpl/DefaultClientProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/defaultImpl/DefaultClientProvider.java new file mode 100644 index 0000000..32cedb6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/defaultImpl/DefaultClientProvider.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.provider.defaultImpl; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.yarn.service.provider.AbstractClientProvider; +import org.apache.slider.api.resource.Artifact; +import org.apache.slider.api.resource.ConfigFile; + +import java.io.IOException; +import java.nio.file.Paths; + +public class DefaultClientProvider extends AbstractClientProvider { + + public DefaultClientProvider() { + } + + @Override + public void validateArtifact(Artifact artifact, FileSystem fileSystem) { + } + + @Override + protected void validateConfigFile(ConfigFile configFile, FileSystem + fileSystem) throws IOException { + // validate dest_file is not absolute + if (Paths.get(configFile.getDestFile()).isAbsolute()) { + throw new IllegalArgumentException( + "Dest_file must not be absolute path: " + configFile.getDestFile()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/defaultImpl/DefaultProviderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/defaultImpl/DefaultProviderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/defaultImpl/DefaultProviderFactory.java new file mode 100644 index 0000000..868bba8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/defaultImpl/DefaultProviderFactory.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.provider.defaultImpl; + +import org.apache.hadoop.yarn.service.provider.AbstractClientProvider; +import org.apache.hadoop.yarn.service.provider.ProviderService; +import org.apache.hadoop.yarn.service.provider.ProviderFactory; + +public final class DefaultProviderFactory extends ProviderFactory { + private static final ProviderFactory FACTORY = new + DefaultProviderFactory(); + + private DefaultProviderFactory() {} + + private static class Client { + static final AbstractClientProvider PROVIDER = new DefaultClientProvider(); + } + + private static class Server { + static final ProviderService PROVIDER = new DefaultProviderService(); + } + + @Override + public AbstractClientProvider createClientProvider() { + return Client.PROVIDER; + } + + @Override + public ProviderService createServerProvider() { + return Server.PROVIDER; + } + + public static ProviderFactory getInstance() { + return FACTORY; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/defaultImpl/DefaultProviderService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/defaultImpl/DefaultProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/defaultImpl/DefaultProviderService.java new file mode 100644 index 0000000..a77214c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/defaultImpl/DefaultProviderService.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.provider.defaultImpl; + +import org.apache.hadoop.yarn.service.compinstance.ComponentInstance; +import org.apache.hadoop.yarn.service.provider.AbstractProviderService; +import org.apache.slider.api.resource.Application; +import org.apache.slider.common.tools.SliderFileSystem; +import org.apache.slider.core.launch.AbstractLauncher; + +import java.io.IOException; + +public class DefaultProviderService extends AbstractProviderService { + + @Override + public void processArtifact(AbstractLauncher launcher, + ComponentInstance compInstance, SliderFileSystem fileSystem, + Application application) + throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/docker/DockerClientProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/docker/DockerClientProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/docker/DockerClientProvider.java new file mode 100644 index 0000000..c1f225c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/docker/DockerClientProvider.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.provider.docker; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.slider.api.resource.Artifact; +import org.apache.slider.api.resource.ConfigFile; +import org.apache.hadoop.yarn.service.conf.SliderKeys; +import org.apache.hadoop.yarn.service.provider.AbstractClientProvider; +import org.apache.slider.util.RestApiErrorMessages; + +import java.io.IOException; + +public class DockerClientProvider extends AbstractClientProvider + implements SliderKeys { + + public DockerClientProvider() { + super(); + } + + @Override + public void validateArtifact(Artifact artifact, FileSystem fileSystem) { + if (artifact == null) { + throw new IllegalArgumentException( + RestApiErrorMessages.ERROR_ARTIFACT_INVALID); + } + if (StringUtils.isEmpty(artifact.getId())) { + throw new IllegalArgumentException( + RestApiErrorMessages.ERROR_ARTIFACT_ID_INVALID); + } + } + + @Override + protected void validateConfigFile(ConfigFile configFile, FileSystem + fileSystem) throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/docker/DockerKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/docker/DockerKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/docker/DockerKeys.java new file mode 100644 index 0000000..f30c002 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/docker/DockerKeys.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.provider.docker; + +public interface DockerKeys { + String PROVIDER_DOCKER = "docker"; + String DOCKER_PREFIX = "docker."; + String DOCKER_IMAGE = DOCKER_PREFIX + "image"; + String DOCKER_NETWORK = DOCKER_PREFIX + "network"; + String DOCKER_USE_PRIVILEGED = DOCKER_PREFIX + "usePrivileged"; + String DOCKER_START_COMMAND = DOCKER_PREFIX + "startCommand"; + + String DEFAULT_DOCKER_NETWORK = "bridge"; + Boolean DEFAULT_DOCKER_USE_PRIVILEGED = false; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/docker/DockerProviderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/docker/DockerProviderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/docker/DockerProviderFactory.java new file mode 100644 index 0000000..57330ab --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/docker/DockerProviderFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.provider.docker; + +import org.apache.hadoop.yarn.service.provider.AbstractClientProvider; +import org.apache.hadoop.yarn.service.provider.ProviderService; +import org.apache.hadoop.yarn.service.provider.ProviderFactory; + +public class DockerProviderFactory extends ProviderFactory { + private static final ProviderFactory FACTORY = new + DockerProviderFactory(); + + private DockerProviderFactory() { + } + + private static class Client { + static final AbstractClientProvider PROVIDER = new DockerClientProvider(); + } + + private static class Server { + static final ProviderService PROVIDER = new DockerProviderService(); + } + + @Override + public AbstractClientProvider createClientProvider() { + return Client.PROVIDER; + } + + @Override + public ProviderService createServerProvider() { + return Server.PROVIDER; + } + + public static ProviderFactory getInstance() { + return FACTORY; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/docker/DockerProviderService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/docker/DockerProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/docker/DockerProviderService.java new file mode 100644 index 0000000..c20eaad --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/docker/DockerProviderService.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.provider.docker; + +import org.apache.hadoop.registry.client.api.RegistryConstants; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.yarn.service.compinstance.ComponentInstance; +import org.apache.hadoop.yarn.service.provider.AbstractProviderService; +import org.apache.slider.api.resource.Application; +import org.apache.slider.common.tools.SliderFileSystem; +import org.apache.slider.core.launch.AbstractLauncher; + +import java.io.IOException; +import java.text.MessageFormat; + +public class DockerProviderService extends AbstractProviderService + implements DockerKeys { + + public void processArtifact(AbstractLauncher launcher, + ComponentInstance compInstance, SliderFileSystem fileSystem, + Application application) throws IOException{ + launcher.setYarnDockerMode(true); + launcher.setDockerImage(compInstance.getCompSpec().getArtifact().getId()); + launcher.setDockerNetwork(compInstance.getCompSpec().getConfiguration() + .getProperty(DOCKER_NETWORK, DEFAULT_DOCKER_NETWORK)); + String domain = compInstance.getComponent().getScheduler().getConfig() + .get(RegistryConstants.KEY_DNS_DOMAIN); + String hostname; + if (domain == null || domain.isEmpty()) { + hostname = MessageFormat + .format("{0}.{1}.{2}", compInstance.getCompInstanceName(), + application.getName(), RegistryUtils.currentUser()); + } else { + hostname = MessageFormat + .format("{0}.{1}.{2}.{3}", compInstance.getCompInstanceName(), + application.getName(), RegistryUtils.currentUser(), domain); + } + launcher.setDockerHostname(hostname); + launcher.setRunPrivilegedContainer( + compInstance.getCompSpec().getRunPrivilegedContainer()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/tarball/TarballClientProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/tarball/TarballClientProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/tarball/TarballClientProvider.java new file mode 100644 index 0000000..2b54be9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/tarball/TarballClientProvider.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.provider.tarball; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.slider.api.resource.Artifact; +import org.apache.slider.api.resource.ConfigFile; +import org.apache.hadoop.yarn.service.conf.SliderKeys; +import org.apache.hadoop.yarn.service.provider.AbstractClientProvider; +import org.apache.slider.util.RestApiErrorMessages; + +import java.io.IOException; +import java.nio.file.Paths; + +public class TarballClientProvider extends AbstractClientProvider + implements SliderKeys { + + public TarballClientProvider() { + } + + @Override + public void validateArtifact(Artifact artifact, FileSystem fs) + throws IOException { + if (artifact == null) { + throw new IllegalArgumentException( + RestApiErrorMessages.ERROR_ARTIFACT_INVALID); + } + if (StringUtils.isEmpty(artifact.getId())) { + throw new IllegalArgumentException( + RestApiErrorMessages.ERROR_ARTIFACT_ID_INVALID); + } + Path p = new Path(artifact.getId()); + if (!fs.exists(p)) { + throw new IllegalArgumentException( "Artifact tarball does not exist " + + artifact.getId()); + } + } + + @Override + protected void validateConfigFile(ConfigFile configFile, FileSystem + fileSystem) throws IOException { + // validate dest_file is not absolute + if (Paths.get(configFile.getDestFile()).isAbsolute()) { + throw new IllegalArgumentException( + "Dest_file must not be absolute path: " + configFile.getDestFile()); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/tarball/TarballProviderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/tarball/TarballProviderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/tarball/TarballProviderFactory.java new file mode 100644 index 0000000..9d81f66 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/tarball/TarballProviderFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.provider.tarball; + +import org.apache.hadoop.yarn.service.provider.AbstractClientProvider; +import org.apache.hadoop.yarn.service.provider.ProviderService; +import org.apache.hadoop.yarn.service.provider.ProviderFactory; + +public class TarballProviderFactory extends ProviderFactory { + private static final ProviderFactory FACTORY = new + TarballProviderFactory(); + + private TarballProviderFactory() { + } + + private static class Client { + static final AbstractClientProvider PROVIDER = new TarballClientProvider(); + } + + private static class Server { + static final ProviderService PROVIDER = new TarballProviderService(); + } + + @Override + public AbstractClientProvider createClientProvider() { + return Client.PROVIDER; + } + + @Override + public ProviderService createServerProvider() { + return Server.PROVIDER; + } + + public static ProviderFactory getInstance() { + return FACTORY; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/tarball/TarballProviderService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/tarball/TarballProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/tarball/TarballProviderService.java new file mode 100644 index 0000000..3c3d425 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/provider/tarball/TarballProviderService.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.provider.tarball; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.service.compinstance.ComponentInstance; +import org.apache.hadoop.yarn.service.provider.AbstractProviderService; +import org.apache.slider.api.resource.Application; +import org.apache.slider.common.tools.SliderFileSystem; +import org.apache.slider.core.launch.AbstractLauncher; + +import java.io.IOException; + +public class TarballProviderService extends AbstractProviderService { + + @Override + public void processArtifact(AbstractLauncher launcher, + ComponentInstance instance, SliderFileSystem fileSystem, + Application application) + throws IOException { + Path artifact = new Path(instance.getCompSpec().getArtifact().getId()); + if (!fileSystem.isFile(artifact)) { + throw new IOException( + "Package doesn't exist as a resource: " + artifact.toString()); + } + log.info("Adding resource {}", artifact.toString()); + LocalResourceType type = LocalResourceType.ARCHIVE; + LocalResource packageResource = fileSystem.createAmResource(artifact, type); + launcher.addLocalResource(APP_LIB_DIR, packageResource); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceMetricsSink.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceMetricsSink.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceMetricsSink.java new file mode 100644 index 0000000..cf4e836 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceMetricsSink.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.timelineservice; + +import org.apache.commons.configuration2.SubsetConfiguration; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.MetricsSink; +import org.apache.hadoop.metrics2.MetricsTag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Write the metrics to a ATSv2. Generally, this class is instantiated via + * hadoop-metrics2 property files. Specifically, you would create this class by + * adding the following to by This would actually be set as: <code> + * [prefix].sink.[some instance name].class + * =org.apache.hadoop.yarn.service.timelineservice.ServiceMetricsSink + * </code>, where <tt>prefix</tt> is "atsv2": and <tt>some instance name</tt> is + * just any unique name, so properties can be differentiated if there are + * multiple sinks of the same type created + */ +public class ServiceMetricsSink implements MetricsSink { + + private static final Logger log = + LoggerFactory.getLogger(ServiceMetricsSink.class); + + private ServiceTimelinePublisher serviceTimelinePublisher; + + public ServiceMetricsSink() { + + } + + public ServiceMetricsSink(ServiceTimelinePublisher publisher) { + serviceTimelinePublisher = publisher; + } + + /** + * Publishes service and component metrics to ATS. + */ + @Override + public void putMetrics(MetricsRecord record) { + if (serviceTimelinePublisher.isStopped()) { + log.warn("ServiceTimelinePublisher has stopped. " + + "Not publishing any more metrics to ATS."); + return; + } + + boolean isServiceMetrics = false; + boolean isComponentMetrics = false; + String appId = null; + for (MetricsTag tag : record.tags()) { + if (tag.name().equals("type") && tag.value().equals("service")) { + isServiceMetrics = true; + } else if (tag.name().equals("type") && tag.value().equals("component")) { + isComponentMetrics = true; + break; // if component metrics, no more information required from tag so + // break the loop + } else if (tag.name().equals("appId")) { + appId = tag.value(); + } + } + + if (isServiceMetrics && appId != null) { + if (log.isDebugEnabled()) { + log.debug("Publishing service metrics. " + record); + } + serviceTimelinePublisher.publishMetrics(record.metrics(), appId, + ServiceTimelineEntityType.SERVICE_ATTEMPT.toString(), + record.timestamp()); + } else if (isComponentMetrics) { + if (log.isDebugEnabled()) { + log.debug("Publishing Component metrics. " + record); + } + serviceTimelinePublisher.publishMetrics(record.metrics(), record.name(), + ServiceTimelineEntityType.COMPONENT.toString(), record.timestamp()); + } + } + + @Override + public void init(SubsetConfiguration conf) { + } + + @Override + public void flush() { + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEntityType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEntityType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEntityType.java new file mode 100644 index 0000000..d5c9539 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEntityType.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.timelineservice; + +/** + * Slider entities that are published to ATS. + */ +public enum ServiceTimelineEntityType { + /** + * Used for publishing service entity information. + */ + SERVICE_ATTEMPT, + + /** + * Used for publishing component entity information. + */ + COMPONENT, + + /** + * Used for publishing component instance entity information. + */ + COMPONENT_INSTANCE +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java new file mode 100644 index 0000000..7f7f9a1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.timelineservice; + +/** + * Events that are used to store in ATS. + */ +public enum ServiceTimelineEvent { + SERVICE_ATTEMPT_REGISTERED, + + SERVICE_ATTEMPT_UNREGISTERED, + + COMPONENT_INSTANCE_REGISTERED, + + COMPONENT_INSTANCE_UNREGISTERED, + + COMPONENT_INSTANCE_UPDATED +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineMetricsConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineMetricsConstants.java new file mode 100644 index 0000000..4f39921 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineMetricsConstants.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.timelineservice; + +/** + * Constants which are stored as key in ATS + */ +public final class ServiceTimelineMetricsConstants { + + public static final String URI = "URI"; + + public static final String NAME = "NAME"; + + public static final String STATE = "STATE"; + + public static final String EXIT_STATUS_CODE = "EXIT_STATUS_CODE"; + + public static final String EXIT_REASON = "EXIT_REASON"; + + public static final String DIAGNOSTICS_INFO = "DIAGNOSTICS_INFO"; + + public static final String LAUNCH_TIME = "LAUNCH_TIME"; + + public static final String QUICK_LINKS = "QUICK_LINKS"; + + public static final String LAUNCH_COMMAND = "LAUNCH_COMMAND"; + + public static final String TOTAL_CONTAINERS = "NUMBER_OF_CONTAINERS"; + + public static final String RUNNING_CONTAINERS = + "NUMBER_OF_RUNNING_CONTAINERS"; + + /** + * Artifacts constants. + */ + public static final String ARTIFACT_ID = "ARTIFACT_ID"; + + public static final String ARTIFACT_TYPE = "ARTIFACT_TYPE"; + + public static final String ARTIFACT_URI = "ARTIFACT_URI"; + + /** + * Resource constants. + */ + public static final String RESOURCE_CPU = "RESOURCE_CPU"; + + public static final String RESOURCE_MEMORY = "RESOURCE_MEMORY"; + + public static final String RESOURCE_PROFILE = "RESOURCE_PROFILE"; + + /** + * component instance constants. + */ + public static final String IP = "IP"; + + public static final String HOSTNAME = "HOSTNAME"; + + public static final String BARE_HOST = "BARE_HOST"; + + public static final String COMPONENT_NAME = "COMPONENT_NAME"; + + public static final String COMPONENT_INSTANCE_NAME = "COMPONENT_INSTANCE_NAME"; + + /** + * component constants. + */ + public static final String DEPENDENCIES = "DEPENDENCIES"; + + public static final String DESCRIPTION = "DESCRIPTION"; + + public static final String UNIQUE_COMPONENT_SUPPORT = + "UNIQUE_COMPONENT_SUPPORT"; + + public static final String RUN_PRIVILEGED_CONTAINER = + "RUN_PRIVILEGED_CONTAINER"; + + public static final String PLACEMENT_POLICY = "PLACEMENT_POLICY"; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java new file mode 100644 index 0000000..f115063 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.timelineservice; + +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.client.api.TimelineV2Client; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.apache.slider.api.resource.Application; +import org.apache.slider.api.resource.Component; +import org.apache.slider.api.resource.ConfigFile; +import org.apache.slider.api.resource.Configuration; +import org.apache.slider.api.resource.Container; +import org.apache.slider.common.tools.SliderUtils; +import org.apache.slider.server.appmaster.actions.ActionStopSlider; +import org.apache.hadoop.yarn.service.compinstance.ComponentInstance; +import org.apache.hadoop.yarn.service.ServiceContext; +import org.apache.slider.server.appmaster.state.AppState; +import org.apache.slider.server.appmaster.state.RoleInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +/** + * A single service that publishes all the Timeline Entities. + */ +public class ServiceTimelinePublisher extends CompositeService { + + // Number of bytes of config which can be published in one shot to ATSv2. + public static final int ATS_CONFIG_PUBLISH_SIZE_BYTES = 10 * 1024; + + private TimelineV2Client timelineClient; + + private volatile boolean stopped = false; + + private static final Logger log = + LoggerFactory.getLogger(ServiceTimelinePublisher.class); + + @Override + protected void serviceInit(org.apache.hadoop.conf.Configuration configuration) + throws Exception { + addService(timelineClient); + } + + + @Override + protected void serviceStop() throws Exception { + stopped = true; + super.serviceStop(); + } + + public boolean isStopped() { + return stopped; + } + + public ServiceTimelinePublisher(TimelineV2Client client) { + super(ServiceTimelinePublisher.class.getName()); + timelineClient = client; + } + + public void serviceAttemptRegistered(Application application) { + long currentTimeMillis = application.getLaunchTime() == null + ? System.currentTimeMillis() : application.getLaunchTime().getTime(); + + TimelineEntity entity = createServiceAttemptEntity(application.getId()); + entity.setCreatedTime(currentTimeMillis); + + // create info keys + Map<String, Object> entityInfos = new HashMap<String, Object>(); + entityInfos.put(ServiceTimelineMetricsConstants.NAME, application.getName()); + entityInfos.put(ServiceTimelineMetricsConstants.STATE, + application.getState().toString()); + entityInfos.put(ServiceTimelineMetricsConstants.LAUNCH_TIME, + currentTimeMillis); + entity.addInfo(ServiceTimelineMetricsConstants.QUICK_LINKS, + application.getQuicklinks()); + entity.addInfo(entityInfos); + + // add an event + TimelineEvent startEvent = new TimelineEvent(); + startEvent.setId(ServiceTimelineEvent.SERVICE_ATTEMPT_REGISTERED.toString()); + startEvent.setTimestamp(currentTimeMillis); + entity.addEvent(startEvent); + + // publish before configurations published + putEntity(entity); + + // publish application specific configurations + publishConfigurations(application.getConfiguration(), application.getId(), + ServiceTimelineEntityType.SERVICE_ATTEMPT.toString(), true); + + // publish component as separate entity. + publishComponents(application.getComponents()); + } + + public void serviceAttemptUpdated(Application application) { + TimelineEntity entity = createServiceAttemptEntity(application.getId()); + entity.addInfo(ServiceTimelineMetricsConstants.QUICK_LINKS, + application.getQuicklinks()); + putEntity(entity); + } + + public void serviceAttemptUnregistered(ServiceContext context) { + TimelineEntity entity = createServiceAttemptEntity( + context.attemptId.getApplicationId().toString()); + Map<String, Object> entityInfos = new HashMap<String, Object>(); + entityInfos.put(ServiceTimelineMetricsConstants.STATE, + FinalApplicationStatus.FAILED); + entity.addInfo(entityInfos); + + // add an event + TimelineEvent finishEvent = new TimelineEvent(); + finishEvent + .setId(ServiceTimelineEvent.SERVICE_ATTEMPT_UNREGISTERED.toString()); + finishEvent.setTimestamp(System.currentTimeMillis()); + entity.addEvent(finishEvent); + + putEntity(entity); + } + + public void serviceAttemptUnregistered(AppState appState, + ActionStopSlider stopAction) { + long currentTimeMillis = System.currentTimeMillis(); + + TimelineEntity entity = + createServiceAttemptEntity(appState.getClusterStatus().getId()); + + // add info + Map<String, Object> entityInfos = new HashMap<String, Object>(); + entityInfos.put(ServiceTimelineMetricsConstants.EXIT_STATUS_CODE, + stopAction.getExitCode()); + entityInfos.put(ServiceTimelineMetricsConstants.STATE, + stopAction.getFinalApplicationStatus().toString()); + if (stopAction.getMessage() != null) { + entityInfos.put(ServiceTimelineMetricsConstants.EXIT_REASON, + stopAction.getMessage()); + } + if (stopAction.getEx() != null) { + entityInfos.put(ServiceTimelineMetricsConstants.DIAGNOSTICS_INFO, + stopAction.getEx().toString()); + } + entity.addInfo(entityInfos); + + // add an event + TimelineEvent startEvent = new TimelineEvent(); + startEvent + .setId(ServiceTimelineEvent.SERVICE_ATTEMPT_UNREGISTERED.toString()); + startEvent.setTimestamp(currentTimeMillis); + entity.addEvent(startEvent); + + putEntity(entity); + } + + public void componentInstanceStarted(Container container, + ComponentInstance instance) { + + TimelineEntity entity = createComponentInstanceEntity(container.getId()); + entity.setCreatedTime(container.getLaunchTime().getTime()); + + // create info keys + Map<String, Object> entityInfos = new HashMap<String, Object>(); + entityInfos.put(ServiceTimelineMetricsConstants.BARE_HOST, + container.getBareHost()); + entityInfos.put(ServiceTimelineMetricsConstants.STATE, + container.getState().toString()); + entityInfos.put(ServiceTimelineMetricsConstants.LAUNCH_TIME, + container.getLaunchTime().getTime()); + entityInfos.put(ServiceTimelineMetricsConstants.COMPONENT_NAME, + instance.getCompName()); + entityInfos.put(ServiceTimelineMetricsConstants.COMPONENT_INSTANCE_NAME, + instance.getCompInstanceName()); + entity.addInfo(entityInfos); + + // add an event + TimelineEvent startEvent = new TimelineEvent(); + startEvent + .setId(ServiceTimelineEvent.COMPONENT_INSTANCE_REGISTERED.toString()); + startEvent.setTimestamp(container.getLaunchTime().getTime()); + entity.addEvent(startEvent); + + putEntity(entity); + } + + public void componentInstanceFinished(RoleInstance instance) { + TimelineEntity entity = createComponentInstanceEntity(instance.id); + + // create info keys + Map<String, Object> entityInfos = new HashMap<String, Object>(); + entityInfos.put(ServiceTimelineMetricsConstants.EXIT_STATUS_CODE, + instance.exitCode); + entityInfos.put(ServiceTimelineMetricsConstants.DIAGNOSTICS_INFO, + instance.diagnostics); + // TODO need to change the state based on enum value. + entityInfos.put(ServiceTimelineMetricsConstants.STATE, "FINISHED"); + entity.addInfo(entityInfos); + + // add an event + TimelineEvent startEvent = new TimelineEvent(); + startEvent + .setId(ServiceTimelineEvent.COMPONENT_INSTANCE_UNREGISTERED.toString()); + startEvent.setTimestamp(System.currentTimeMillis()); + entity.addEvent(startEvent); + + putEntity(entity); + } + + public void componentInstanceFinished(ComponentInstance instance, + int exitCode, ContainerState state, String diagnostics) { + TimelineEntity entity = createComponentInstanceEntity( + instance.getContainer().getId().toString()); + + // create info keys + Map<String, Object> entityInfos = new HashMap<String, Object>(); + entityInfos.put(ServiceTimelineMetricsConstants.EXIT_STATUS_CODE, + exitCode); + entityInfos.put(ServiceTimelineMetricsConstants.DIAGNOSTICS_INFO, diagnostics); + entityInfos.put(ServiceTimelineMetricsConstants.STATE, state); + entity.addInfo(entityInfos); + + // add an event + TimelineEvent startEvent = new TimelineEvent(); + startEvent + .setId(ServiceTimelineEvent.COMPONENT_INSTANCE_UNREGISTERED.toString()); + startEvent.setTimestamp(System.currentTimeMillis()); + entity.addEvent(startEvent); + + putEntity(entity); + } + + public void componentInstanceUpdated(Container container) { + TimelineEntity entity = createComponentInstanceEntity(container.getId()); + + // create info keys + Map<String, Object> entityInfos = new HashMap<String, Object>(); + entityInfos.put(ServiceTimelineMetricsConstants.IP, container.getIp()); + entityInfos.put(ServiceTimelineMetricsConstants.HOSTNAME, + container.getHostname()); + entityInfos.put(ServiceTimelineMetricsConstants.STATE, + container.getState().toString()); + entity.addInfo(entityInfos); + + TimelineEvent updateEvent = new TimelineEvent(); + updateEvent + .setId(ServiceTimelineEvent.COMPONENT_INSTANCE_UPDATED.toString()); + updateEvent.setTimestamp(System.currentTimeMillis()); + entity.addEvent(updateEvent); + + putEntity(entity); + } + + private void publishComponents(List<Component> components) { + long currentTimeMillis = System.currentTimeMillis(); + for (Component component : components) { + TimelineEntity entity = createComponentEntity(component.getName()); + entity.setCreatedTime(currentTimeMillis); + + // create info keys + Map<String, Object> entityInfos = new HashMap<String, Object>(); + entityInfos.put(ServiceTimelineMetricsConstants.ARTIFACT_ID, + component.getArtifact().getId()); + entityInfos.put(ServiceTimelineMetricsConstants.ARTIFACT_TYPE, + component.getArtifact().getType().toString()); + if (component.getResource().getProfile() != null) { + entityInfos.put(ServiceTimelineMetricsConstants.RESOURCE_PROFILE, + component.getResource().getProfile()); + } + entityInfos.put(ServiceTimelineMetricsConstants.RESOURCE_CPU, + component.getResource().getCpus()); + entityInfos.put(ServiceTimelineMetricsConstants.RESOURCE_MEMORY, + component.getResource().getMemory()); + + if (component.getLaunchCommand() != null) { + entityInfos.put(ServiceTimelineMetricsConstants.LAUNCH_COMMAND, + component.getLaunchCommand()); + } + entityInfos.put(ServiceTimelineMetricsConstants.UNIQUE_COMPONENT_SUPPORT, + component.getUniqueComponentSupport().toString()); + entityInfos.put(ServiceTimelineMetricsConstants.RUN_PRIVILEGED_CONTAINER, + component.getRunPrivilegedContainer().toString()); + if (component.getPlacementPolicy() != null) { + entityInfos.put(ServiceTimelineMetricsConstants.PLACEMENT_POLICY, + component.getPlacementPolicy().getLabel()); + } + entity.addInfo(entityInfos); + + putEntity(entity); + + // publish component specific configurations + publishConfigurations(component.getConfiguration(), component.getName(), + ServiceTimelineEntityType.COMPONENT.toString(), false); + } + } + + private void publishConfigurations(Configuration configuration, + String entityId, String entityType, boolean isServiceAttemptEntity) { + if (isServiceAttemptEntity) { + // publish slider-client.xml properties at service level + publishConfigurations(SliderUtils.loadSliderClientXML().iterator(), + entityId, entityType); + } + publishConfigurations(configuration.getProperties().entrySet().iterator(), + entityId, entityType); + + publishConfigurations(configuration.getEnv().entrySet().iterator(), + entityId, entityType); + + for (ConfigFile configFile : configuration.getFiles()) { + publishConfigurations(configFile.getProps().entrySet().iterator(), + entityId, entityType); + } + } + + private void publishConfigurations(Iterator<Entry<String, String>> iterator, + String entityId, String entityType) { + int configSize = 0; + TimelineEntity entity = createTimelineEntity(entityId, entityType); + while (iterator.hasNext()) { + Entry<String, String> entry = iterator.next(); + int size = entry.getKey().length() + entry.getValue().length(); + configSize += size; + // Configs are split into multiple entities if they exceed 100kb in size. + if (configSize > ATS_CONFIG_PUBLISH_SIZE_BYTES) { + if (entity.getConfigs().size() > 0) { + putEntity(entity); + entity = createTimelineEntity(entityId, entityType); + } + configSize = size; + } + entity.addConfig(entry.getKey(), entry.getValue()); + } + if (configSize > 0) { + putEntity(entity); + } + } + + /** + * Called from ServiceMetricsSink at regular interval of time. + * @param metrics of service or components + * @param entityId Id of entity + * @param entityType Type of entity + * @param timestamp + */ + public void publishMetrics(Iterable<AbstractMetric> metrics, String entityId, + String entityType, long timestamp) { + TimelineEntity entity = createTimelineEntity(entityId, entityType); + Set<TimelineMetric> entityMetrics = new HashSet<TimelineMetric>(); + for (AbstractMetric metric : metrics) { + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setId(metric.name()); + timelineMetric.addValue(timestamp, metric.value()); + entityMetrics.add(timelineMetric); + } + entity.setMetrics(entityMetrics); + putEntity(entity); + } + + private TimelineEntity createServiceAttemptEntity(String serviceId) { + TimelineEntity entity = createTimelineEntity(serviceId, + ServiceTimelineEntityType.SERVICE_ATTEMPT.toString()); + return entity; + } + + private TimelineEntity createComponentInstanceEntity(String instanceId) { + TimelineEntity entity = createTimelineEntity(instanceId, + ServiceTimelineEntityType.COMPONENT_INSTANCE.toString()); + return entity; + } + + private TimelineEntity createComponentEntity(String componentId) { + TimelineEntity entity = createTimelineEntity(componentId, + ServiceTimelineEntityType.COMPONENT.toString()); + return entity; + } + + private TimelineEntity createTimelineEntity(String entityId, + String entityType) { + TimelineEntity entity = new TimelineEntity(); + entity.setId(entityId); + entity.setType(entityType); + return entity; + } + + private void putEntity(TimelineEntity entity) { + try { + if (log.isDebugEnabled()) { + log.debug("Publishing the entity " + entity + ", JSON-style content: " + + TimelineUtils.dumpTimelineRecordtoJSON(entity)); + } + if (timelineClient != null) { + timelineClient.putEntitiesAsync(entity); + } else { + log.error("Seems like client has been removed before the entity " + + "could be published for " + entity); + } + } catch (Exception e) { + log.error("Error when publishing entity " + entity, e); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/164c0c4c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/package-info.java new file mode 100644 index 0000000..72f7842 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * ATS implementation + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.service.timelineservice; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org