http://git-wip-us.apache.org/repos/asf/hadoop/blob/40ab068e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/registry/YarnRegistryViewForProviders.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/registry/YarnRegistryViewForProviders.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/registry/YarnRegistryViewForProviders.java new file mode 100644 index 0000000..add2475 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/registry/YarnRegistryViewForProviders.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.registry; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.PathNotFoundException; +import org.apache.hadoop.registry.client.api.RegistryConstants; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.registry.client.api.BindFlags; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; + +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId; +import org.apache.hadoop.yarn.service.utils.SliderUtils; + +import java.io.IOException; +import java.util.List; + +import static org.apache.hadoop.registry.client.binding.RegistryPathUtils.join; + +/** + * Registry view for providers. This tracks where the service + * is registered, offers access to the record and other things. + */ +public class YarnRegistryViewForProviders { + private static final Log LOG = + LogFactory.getLog(YarnRegistryViewForProviders.class); + + private final RegistryOperations registryOperations; + private final String user; + private final String sliderServiceClass; + private final String instanceName; + /** + * Record used where the service registered itself. + * Null until the service is registered + */ + private ServiceRecord selfRegistration; + + /** + * Path where record was registered + * Null until the service is registered + */ + private String selfRegistrationPath; + + public YarnRegistryViewForProviders(RegistryOperations registryOperations, + String user, + String sliderServiceClass, + String instanceName, + ApplicationAttemptId applicationAttemptId) { + Preconditions.checkArgument(registryOperations != null, + "null registry operations"); + Preconditions.checkArgument(user != null, "null user"); + Preconditions.checkArgument(SliderUtils.isSet(sliderServiceClass), + "unset service class"); + Preconditions.checkArgument(SliderUtils.isSet(instanceName), + "instanceName"); + Preconditions.checkArgument(applicationAttemptId != null, + "null applicationAttemptId"); + this.registryOperations = registryOperations; + this.user = user; + this.sliderServiceClass = sliderServiceClass; + this.instanceName = instanceName; + } + + public String getUser() { + return user; + } + + + private void setSelfRegistration(ServiceRecord selfRegistration) { + this.selfRegistration = selfRegistration; + } + + /** + * Get the path to where the service has registered itself. + * Null until the service is registered + * @return the service registration path. + */ + public String getSelfRegistrationPath() { + return selfRegistrationPath; + } + + /** + * Get the absolute path to where the service has registered itself. + * This includes the base registry path + * Null until the service is registered + * @return the service registration path. + */ + public String getAbsoluteSelfRegistrationPath() { + if (selfRegistrationPath == null) { + return null; + } + String root = registryOperations.getConfig().getTrimmed( + RegistryConstants.KEY_REGISTRY_ZK_ROOT, + RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT); + return RegistryPathUtils.join(root, selfRegistrationPath); + } + + /** + * Add a component under the slider name/entry + * @param componentName component name + * @param record record to put + * @throws IOException + */ + public void putComponent(String componentName, + ServiceRecord record) throws + IOException { + putComponent(sliderServiceClass, instanceName, + componentName, + record); + } + + /** + * Add a component + * @param serviceClass service class to use under ~user + * @param componentName component name + * @param record record to put + * @throws IOException + */ + public void putComponent(String serviceClass, + String serviceName, + String componentName, + ServiceRecord record) throws IOException { + String path = RegistryUtils.componentPath( + user, serviceClass, serviceName, componentName); + registryOperations.mknode(RegistryPathUtils.parentOf(path), true); + registryOperations.bind(path, record, BindFlags.OVERWRITE); + } + + /** + * Add a service under a path, optionally purging any history + * @param username user + * @param serviceClass service class to use under ~user + * @param serviceName name of the service + * @param record service record + * @param deleteTreeFirst perform recursive delete of the path first. + * @return the path the service was created at + * @throws IOException + */ + public String putService(String username, + String serviceClass, + String serviceName, + ServiceRecord record, + boolean deleteTreeFirst) throws IOException { + String path = RegistryUtils.servicePath( + username, serviceClass, serviceName); + if (deleteTreeFirst) { + registryOperations.delete(path, true); + } + registryOperations.mknode(RegistryPathUtils.parentOf(path), true); + registryOperations.bind(path, record, BindFlags.OVERWRITE); + return path; + } + + /** + * Add a service under a path for the current user + * @param record service record + * @param deleteTreeFirst perform recursive delete of the path first + * @return the path the service was created at + * @throws IOException + */ + public String registerSelf( + ServiceRecord record, + boolean deleteTreeFirst) throws IOException { + selfRegistrationPath = + putService(user, sliderServiceClass, instanceName, record, deleteTreeFirst); + setSelfRegistration(record); + return selfRegistrationPath; + } + + /** + * Delete a component + * @param containerId component name + * @throws IOException + */ + public void deleteComponent(ComponentInstanceId instanceId, + String containerId) throws IOException { + String path = RegistryUtils.componentPath( + user, sliderServiceClass, instanceName, + containerId); + LOG.info(instanceId + ": Deleting registry path " + path); + registryOperations.delete(path, false); + } + + /** + * Delete the children of a path -but not the path itself. + * It is not an error if the path does not exist + * @param path path to delete + * @param recursive flag to request recursive deletes + * @throws IOException IO problems + */ + public void deleteChildren(String path, boolean recursive) throws IOException { + List<String> childNames = null; + try { + childNames = registryOperations.list(path); + } catch (PathNotFoundException e) { + return; + } + for (String childName : childNames) { + String child = join(path, childName); + registryOperations.delete(child, recursive); + } + } + +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40ab068e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceMetricsSink.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceMetricsSink.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceMetricsSink.java new file mode 100644 index 0000000..cf4e836 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceMetricsSink.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.timelineservice; + +import org.apache.commons.configuration2.SubsetConfiguration; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.MetricsSink; +import org.apache.hadoop.metrics2.MetricsTag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Write the metrics to a ATSv2. Generally, this class is instantiated via + * hadoop-metrics2 property files. Specifically, you would create this class by + * adding the following to by This would actually be set as: <code> + * [prefix].sink.[some instance name].class + * =org.apache.hadoop.yarn.service.timelineservice.ServiceMetricsSink + * </code>, where <tt>prefix</tt> is "atsv2": and <tt>some instance name</tt> is + * just any unique name, so properties can be differentiated if there are + * multiple sinks of the same type created + */ +public class ServiceMetricsSink implements MetricsSink { + + private static final Logger log = + LoggerFactory.getLogger(ServiceMetricsSink.class); + + private ServiceTimelinePublisher serviceTimelinePublisher; + + public ServiceMetricsSink() { + + } + + public ServiceMetricsSink(ServiceTimelinePublisher publisher) { + serviceTimelinePublisher = publisher; + } + + /** + * Publishes service and component metrics to ATS. + */ + @Override + public void putMetrics(MetricsRecord record) { + if (serviceTimelinePublisher.isStopped()) { + log.warn("ServiceTimelinePublisher has stopped. " + + "Not publishing any more metrics to ATS."); + return; + } + + boolean isServiceMetrics = false; + boolean isComponentMetrics = false; + String appId = null; + for (MetricsTag tag : record.tags()) { + if (tag.name().equals("type") && tag.value().equals("service")) { + isServiceMetrics = true; + } else if (tag.name().equals("type") && tag.value().equals("component")) { + isComponentMetrics = true; + break; // if component metrics, no more information required from tag so + // break the loop + } else if (tag.name().equals("appId")) { + appId = tag.value(); + } + } + + if (isServiceMetrics && appId != null) { + if (log.isDebugEnabled()) { + log.debug("Publishing service metrics. " + record); + } + serviceTimelinePublisher.publishMetrics(record.metrics(), appId, + ServiceTimelineEntityType.SERVICE_ATTEMPT.toString(), + record.timestamp()); + } else if (isComponentMetrics) { + if (log.isDebugEnabled()) { + log.debug("Publishing Component metrics. " + record); + } + serviceTimelinePublisher.publishMetrics(record.metrics(), record.name(), + ServiceTimelineEntityType.COMPONENT.toString(), record.timestamp()); + } + } + + @Override + public void init(SubsetConfiguration conf) { + } + + @Override + public void flush() { + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/40ab068e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEntityType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEntityType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEntityType.java new file mode 100644 index 0000000..d5c9539 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEntityType.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.timelineservice; + +/** + * Slider entities that are published to ATS. + */ +public enum ServiceTimelineEntityType { + /** + * Used for publishing service entity information. + */ + SERVICE_ATTEMPT, + + /** + * Used for publishing component entity information. + */ + COMPONENT, + + /** + * Used for publishing component instance entity information. + */ + COMPONENT_INSTANCE +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/40ab068e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java new file mode 100644 index 0000000..7f7f9a1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.timelineservice; + +/** + * Events that are used to store in ATS. + */ +public enum ServiceTimelineEvent { + SERVICE_ATTEMPT_REGISTERED, + + SERVICE_ATTEMPT_UNREGISTERED, + + COMPONENT_INSTANCE_REGISTERED, + + COMPONENT_INSTANCE_UNREGISTERED, + + COMPONENT_INSTANCE_UPDATED +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/40ab068e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineMetricsConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineMetricsConstants.java new file mode 100644 index 0000000..78a7171 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineMetricsConstants.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.timelineservice; + +/** + * Constants which are stored as key in ATS + */ +public final class ServiceTimelineMetricsConstants { + + public static final String URI = "URI"; + + public static final String NAME = "NAME"; + + public static final String STATE = "STATE"; + + public static final String EXIT_STATUS_CODE = "EXIT_STATUS_CODE"; + + public static final String EXIT_REASON = "EXIT_REASON"; + + public static final String DIAGNOSTICS_INFO = "DIAGNOSTICS_INFO"; + + public static final String LAUNCH_TIME = "LAUNCH_TIME"; + + public static final String QUICK_LINKS = "QUICK_LINKS"; + + public static final String LAUNCH_COMMAND = "LAUNCH_COMMAND"; + + public static final String TOTAL_CONTAINERS = "NUMBER_OF_CONTAINERS"; + + public static final String RUNNING_CONTAINERS = + "NUMBER_OF_RUNNING_CONTAINERS"; + + /** + * Artifacts constants. + */ + public static final String ARTIFACT_ID = "ARTIFACT_ID"; + + public static final String ARTIFACT_TYPE = "ARTIFACT_TYPE"; + + public static final String ARTIFACT_URI = "ARTIFACT_URI"; + + /** + * Resource constants. + */ + public static final String RESOURCE_CPU = "RESOURCE_CPU"; + + public static final String RESOURCE_MEMORY = "RESOURCE_MEMORY"; + + public static final String RESOURCE_PROFILE = "RESOURCE_PROFILE"; + + /** + * component instance constants. + */ + public static final String IP = "IP"; + + public static final String HOSTNAME = "HOSTNAME"; + + public static final String BARE_HOST = "BARE_HOST"; + + public static final String COMPONENT_NAME = "COMPONENT_NAME"; + + public static final String COMPONENT_INSTANCE_NAME = "COMPONENT_INSTANCE_NAME"; + + /** + * component constants. + */ + public static final String DEPENDENCIES = "DEPENDENCIES"; + + public static final String DESCRIPTION = "DESCRIPTION"; + + public static final String RUN_PRIVILEGED_CONTAINER = + "RUN_PRIVILEGED_CONTAINER"; + + public static final String PLACEMENT_POLICY = "PLACEMENT_POLICY"; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/40ab068e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java new file mode 100644 index 0000000..5e65ad9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.timelineservice; + +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.client.api.TimelineV2Client; +import org.apache.hadoop.yarn.service.ServiceContext; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.ConfigFile; +import org.apache.hadoop.yarn.service.api.records.Configuration; +import org.apache.hadoop.yarn.service.api.records.Container; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import static org.apache.hadoop.yarn.service.timelineservice.ServiceTimelineMetricsConstants.DIAGNOSTICS_INFO; + +/** + * A single service that publishes all the Timeline Entities. + */ +public class ServiceTimelinePublisher extends CompositeService { + + // Number of bytes of config which can be published in one shot to ATSv2. + public static final int ATS_CONFIG_PUBLISH_SIZE_BYTES = 10 * 1024; + + private TimelineV2Client timelineClient; + + private volatile boolean stopped = false; + + private static final Logger log = + LoggerFactory.getLogger(ServiceTimelinePublisher.class); + + @Override + protected void serviceInit(org.apache.hadoop.conf.Configuration configuration) + throws Exception { + addService(timelineClient); + } + + + @Override + protected void serviceStop() throws Exception { + stopped = true; + super.serviceStop(); + } + + public boolean isStopped() { + return stopped; + } + + public ServiceTimelinePublisher(TimelineV2Client client) { + super(ServiceTimelinePublisher.class.getName()); + timelineClient = client; + } + + public void serviceAttemptRegistered(Service service, + org.apache.hadoop.conf.Configuration systemConf) { + long currentTimeMillis = service.getLaunchTime() == null + ? System.currentTimeMillis() : service.getLaunchTime().getTime(); + + TimelineEntity entity = createServiceAttemptEntity(service.getId()); + entity.setCreatedTime(currentTimeMillis); + + // create info keys + Map<String, Object> entityInfos = new HashMap<String, Object>(); + entityInfos.put(ServiceTimelineMetricsConstants.NAME, service.getName()); + entityInfos.put(ServiceTimelineMetricsConstants.STATE, + service.getState().toString()); + entityInfos.put(ServiceTimelineMetricsConstants.LAUNCH_TIME, + currentTimeMillis); + entity.addInfo(ServiceTimelineMetricsConstants.QUICK_LINKS, + service.getQuicklinks()); + entity.addInfo(entityInfos); + + // add an event + TimelineEvent startEvent = new TimelineEvent(); + startEvent.setId(ServiceTimelineEvent.SERVICE_ATTEMPT_REGISTERED.toString()); + startEvent.setTimestamp(currentTimeMillis); + entity.addEvent(startEvent); + + // publish before configurations published + putEntity(entity); + + // publish system config - YarnConfiguration + populateTimelineEntity(systemConf.iterator(), service.getId(), + ServiceTimelineEntityType.SERVICE_ATTEMPT.toString()); + // publish user conf + publishUserConf(service.getConfiguration(), service.getId(), + ServiceTimelineEntityType.SERVICE_ATTEMPT.toString()); + + // publish component as separate entity. + publishComponents(service.getComponents()); + } + + public void serviceAttemptUpdated(Service service) { + TimelineEntity entity = createServiceAttemptEntity(service.getId()); + entity.addInfo(ServiceTimelineMetricsConstants.QUICK_LINKS, + service.getQuicklinks()); + putEntity(entity); + } + + public void serviceAttemptUnregistered(ServiceContext context, + String diagnostics) { + TimelineEntity entity = createServiceAttemptEntity( + context.attemptId.getApplicationId().toString()); + Map<String, Object> entityInfos = new HashMap<String, Object>(); + entityInfos.put(ServiceTimelineMetricsConstants.STATE, + FinalApplicationStatus.ENDED); + entityInfos.put(DIAGNOSTICS_INFO, diagnostics); + entity.addInfo(entityInfos); + + // add an event + TimelineEvent finishEvent = new TimelineEvent(); + finishEvent + .setId(ServiceTimelineEvent.SERVICE_ATTEMPT_UNREGISTERED.toString()); + finishEvent.setTimestamp(System.currentTimeMillis()); + entity.addEvent(finishEvent); + + putEntity(entity); + } + + public void componentInstanceStarted(Container container, + ComponentInstance instance) { + + TimelineEntity entity = createComponentInstanceEntity(container.getId()); + entity.setCreatedTime(container.getLaunchTime().getTime()); + + // create info keys + Map<String, Object> entityInfos = new HashMap<String, Object>(); + entityInfos.put(ServiceTimelineMetricsConstants.BARE_HOST, + container.getBareHost()); + entityInfos.put(ServiceTimelineMetricsConstants.STATE, + container.getState().toString()); + entityInfos.put(ServiceTimelineMetricsConstants.LAUNCH_TIME, + container.getLaunchTime().getTime()); + entityInfos.put(ServiceTimelineMetricsConstants.COMPONENT_NAME, + instance.getCompName()); + entityInfos.put(ServiceTimelineMetricsConstants.COMPONENT_INSTANCE_NAME, + instance.getCompInstanceName()); + entity.addInfo(entityInfos); + + // add an event + TimelineEvent startEvent = new TimelineEvent(); + startEvent + .setId(ServiceTimelineEvent.COMPONENT_INSTANCE_REGISTERED.toString()); + startEvent.setTimestamp(container.getLaunchTime().getTime()); + entity.addEvent(startEvent); + + putEntity(entity); + } + + public void componentInstanceFinished(ComponentInstance instance, + int exitCode, ContainerState state, String diagnostics) { + TimelineEntity entity = createComponentInstanceEntity( + instance.getContainer().getId().toString()); + + // create info keys + Map<String, Object> entityInfos = new HashMap<String, Object>(); + entityInfos.put(ServiceTimelineMetricsConstants.EXIT_STATUS_CODE, + exitCode); + entityInfos.put(DIAGNOSTICS_INFO, diagnostics); + entityInfos.put(ServiceTimelineMetricsConstants.STATE, state); + entity.addInfo(entityInfos); + + // add an event + TimelineEvent startEvent = new TimelineEvent(); + startEvent + .setId(ServiceTimelineEvent.COMPONENT_INSTANCE_UNREGISTERED.toString()); + startEvent.setTimestamp(System.currentTimeMillis()); + entity.addEvent(startEvent); + + putEntity(entity); + } + + public void componentInstanceUpdated(Container container) { + TimelineEntity entity = createComponentInstanceEntity(container.getId()); + + // create info keys + Map<String, Object> entityInfos = new HashMap<String, Object>(); + entityInfos.put(ServiceTimelineMetricsConstants.IP, container.getIp()); + entityInfos.put(ServiceTimelineMetricsConstants.HOSTNAME, + container.getHostname()); + entityInfos.put(ServiceTimelineMetricsConstants.STATE, + container.getState().toString()); + entity.addInfo(entityInfos); + + TimelineEvent updateEvent = new TimelineEvent(); + updateEvent + .setId(ServiceTimelineEvent.COMPONENT_INSTANCE_UPDATED.toString()); + updateEvent.setTimestamp(System.currentTimeMillis()); + entity.addEvent(updateEvent); + + putEntity(entity); + } + + private void publishComponents(List<Component> components) { + long currentTimeMillis = System.currentTimeMillis(); + for (Component component : components) { + TimelineEntity entity = createComponentEntity(component.getName()); + entity.setCreatedTime(currentTimeMillis); + + // create info keys + Map<String, Object> entityInfos = new HashMap<String, Object>(); + entityInfos.put(ServiceTimelineMetricsConstants.ARTIFACT_ID, + component.getArtifact().getId()); + entityInfos.put(ServiceTimelineMetricsConstants.ARTIFACT_TYPE, + component.getArtifact().getType().toString()); + if (component.getResource().getProfile() != null) { + entityInfos.put(ServiceTimelineMetricsConstants.RESOURCE_PROFILE, + component.getResource().getProfile()); + } + entityInfos.put(ServiceTimelineMetricsConstants.RESOURCE_CPU, + component.getResource().getCpus()); + entityInfos.put(ServiceTimelineMetricsConstants.RESOURCE_MEMORY, + component.getResource().getMemory()); + + if (component.getLaunchCommand() != null) { + entityInfos.put(ServiceTimelineMetricsConstants.LAUNCH_COMMAND, + component.getLaunchCommand()); + } + entityInfos.put(ServiceTimelineMetricsConstants.RUN_PRIVILEGED_CONTAINER, + component.getRunPrivilegedContainer().toString()); + if (component.getPlacementPolicy() != null) { + entityInfos.put(ServiceTimelineMetricsConstants.PLACEMENT_POLICY, + component.getPlacementPolicy().getLabel()); + } + entity.addInfo(entityInfos); + + putEntity(entity); + + // publish component specific configurations + publishUserConf(component.getConfiguration(), component.getName(), + ServiceTimelineEntityType.COMPONENT.toString()); + } + } + + private void publishUserConf(Configuration configuration, + String entityId, String entityType) { + populateTimelineEntity(configuration.getProperties().entrySet().iterator(), + entityId, entityType); + + populateTimelineEntity(configuration.getEnv().entrySet().iterator(), + entityId, entityType); + + for (ConfigFile configFile : configuration.getFiles()) { + populateTimelineEntity(configFile.getProps().entrySet().iterator(), + entityId, entityType); + } + } + + private void populateTimelineEntity(Iterator<Entry<String, String>> iterator, + String entityId, String entityType) { + int configSize = 0; + TimelineEntity entity = createTimelineEntity(entityId, entityType); + while (iterator.hasNext()) { + Entry<String, String> entry = iterator.next(); + int size = entry.getKey().length() + entry.getValue().length(); + configSize += size; + // Configs are split into multiple entities if they exceed 100kb in size. + if (configSize > ATS_CONFIG_PUBLISH_SIZE_BYTES) { + if (entity.getConfigs().size() > 0) { + putEntity(entity); + entity = createTimelineEntity(entityId, entityType); + } + configSize = size; + } + entity.addConfig(entry.getKey(), entry.getValue()); + } + if (configSize > 0) { + putEntity(entity); + } + } + + /** + * Called from ServiceMetricsSink at regular interval of time. + * @param metrics of service or components + * @param entityId Id of entity + * @param entityType Type of entity + * @param timestamp + */ + public void publishMetrics(Iterable<AbstractMetric> metrics, String entityId, + String entityType, long timestamp) { + TimelineEntity entity = createTimelineEntity(entityId, entityType); + Set<TimelineMetric> entityMetrics = new HashSet<TimelineMetric>(); + for (AbstractMetric metric : metrics) { + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setId(metric.name()); + timelineMetric.addValue(timestamp, metric.value()); + entityMetrics.add(timelineMetric); + } + entity.setMetrics(entityMetrics); + putEntity(entity); + } + + private TimelineEntity createServiceAttemptEntity(String serviceId) { + TimelineEntity entity = createTimelineEntity(serviceId, + ServiceTimelineEntityType.SERVICE_ATTEMPT.toString()); + return entity; + } + + private TimelineEntity createComponentInstanceEntity(String instanceId) { + TimelineEntity entity = createTimelineEntity(instanceId, + ServiceTimelineEntityType.COMPONENT_INSTANCE.toString()); + return entity; + } + + private TimelineEntity createComponentEntity(String componentId) { + TimelineEntity entity = createTimelineEntity(componentId, + ServiceTimelineEntityType.COMPONENT.toString()); + return entity; + } + + private TimelineEntity createTimelineEntity(String entityId, + String entityType) { + TimelineEntity entity = new TimelineEntity(); + entity.setId(entityId); + entity.setType(entityType); + return entity; + } + + private void putEntity(TimelineEntity entity) { + try { + if (log.isDebugEnabled()) { + log.debug("Publishing the entity " + entity + ", JSON-style content: " + + TimelineUtils.dumpTimelineRecordtoJSON(entity)); + } + if (timelineClient != null) { + timelineClient.putEntitiesAsync(entity); + } else { + log.error("Seems like client has been removed before the entity " + + "could be published for " + entity); + } + } catch (Exception e) { + log.error("Error when publishing entity " + entity, e); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/40ab068e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/package-info.java new file mode 100644 index 0000000..72f7842 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * ATS implementation + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.service.timelineservice; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/40ab068e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ApplicationReportSerDeser.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ApplicationReportSerDeser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ApplicationReportSerDeser.java new file mode 100644 index 0000000..2607c08 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ApplicationReportSerDeser.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.utils; + +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; + +import java.io.IOException; + +/** + * Persistence of {@link SerializedApplicationReport} + * + */ +public class ApplicationReportSerDeser + extends JsonSerDeser<SerializedApplicationReport> { + public ApplicationReportSerDeser() { + super(SerializedApplicationReport.class); + } + + + private static final ApplicationReportSerDeser + staticinstance = new ApplicationReportSerDeser(); + + /** + * Convert an instance to a JSON string -sync access to a shared ser/deser + * object instance + * @param instance object to convert + * @return a JSON string description + * @throws JsonParseException parse problems + * @throws JsonMappingException O/J mapping problems + */ + public static String toString(SerializedApplicationReport instance) + throws IOException, JsonGenerationException, JsonMappingException { + synchronized (staticinstance) { + return staticinstance.toJson(instance); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/40ab068e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ClientRegistryBinder.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ClientRegistryBinder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ClientRegistryBinder.java new file mode 100644 index 0000000..86896b2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ClientRegistryBinder.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.utils; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.PathNotFoundException; +import org.apache.hadoop.registry.client.api.RegistryConstants; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.registry.client.exceptions.InvalidRecordException; +import org.apache.hadoop.registry.client.impl.zk.RegistryInternalConstants; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +import static org.apache.hadoop.registry.client.binding.RegistryPathUtils.encodeForRegistry; +import static org.apache.hadoop.registry.client.binding.RegistryUtils.convertUsername; +import static org.apache.hadoop.registry.client.binding.RegistryUtils.getCurrentUsernameUnencoded; +import static org.apache.hadoop.registry.client.binding.RegistryUtils.servicePath; + +/** + * Generic code to get the URLs for clients via the registry + */ +public class ClientRegistryBinder { + private static final Logger log = + LoggerFactory.getLogger(ClientRegistryBinder.class); + + private final RegistryOperations operations; + + public ClientRegistryBinder(RegistryOperations operations) { + this.operations = operations; + } + + /** + * Buld the user path -switches to the system path if the user is "". + * It also cross-converts the username to ascii via punycode + * @param username username or "" + * @return the path to the user + */ + public static String homePathForUser(String username) { + Preconditions.checkArgument(username != null, "null user"); + + // catch recursion + if (username.startsWith(RegistryConstants.PATH_USERS)) { + return username; + } + + if (username.isEmpty()) { + return RegistryConstants.PATH_SYSTEM_SERVICES; + } + + // convert username to registry name + String convertedName = convertUsername(username); + + return RegistryPathUtils.join(RegistryConstants.PATH_USERS, + encodeForRegistry(convertedName)); + } + + /** + * Get the current username, before any encoding has been applied. + * @return the current user from the kerberos identity, falling back + * to the user and/or env variables. + */ + public static String currentUsernameUnencoded() { + String env_hadoop_username = System.getenv( + RegistryInternalConstants.HADOOP_USER_NAME); + return getCurrentUsernameUnencoded(env_hadoop_username); + } + + /** + * Qualify a user. + * <ol> + * <li> <code>"~"</code> maps to user home path home</li> + * <li> <code>"~user"</code> maps to <code>/users/$user</code></li> + * <li> <code>"/"</code> maps to <code>/services/</code></li> + * </ol> + * @param user the username + * @return the base path + */ + public static String qualifyUser(String user) { + // qualify the user + String t = user.trim(); + if (t.startsWith("/")) { + // already resolved + return t; + } else if (t.equals("~")) { + // self + return currentUsernameUnencoded(); + } else if (t.startsWith("~")) { + // another user + // convert username to registry name + String convertedName = convertUsername(t.substring(1)); + + return RegistryPathUtils.join(RegistryConstants.PATH_USERS, + encodeForRegistry(convertedName)); + } else { + return "/" + t; + } + } + + /** + * Look up an external REST API + * @param user user which will be qualified as per {@link #qualifyUser(String)} + * @param serviceClass service class + * @param instance instance name + * @param api API + * @return the API, or an exception is raised. + * @throws IOException + */ + public String lookupExternalRestAPI(String user, + String serviceClass, + String instance, + String api) + throws IOException { + String qualified = qualifyUser(user); + String path = servicePath(qualified, serviceClass, instance); + String restAPI = resolveExternalRestAPI(api, path); + if (restAPI == null) { + throw new PathNotFoundException(path + " API " + api); + } + return restAPI; + } + + /** + * Resolve a service record then return an external REST API exported it. + * + * @param api API to resolve + * @param path path of the service record + * @return null if the record exists but the API is absent or it has no + * REST endpoints. + * @throws IOException resolution problems, as covered in + * {@link RegistryOperations#resolve(String)} + */ + protected String resolveExternalRestAPI(String api, String path) throws + IOException { + ServiceRecord record = operations.resolve(path); + return lookupRestAPI(record, api, true); + } + + /** + * Look up an external REST API endpoint + * @param record service record + * @param api URI of api + * @param external flag to indicate this is an external record + * @return the first endpoint of the implementation, or null if there + * is no entry for the API, implementation or it's the wrong type. + */ + public static String lookupRestAPI(ServiceRecord record, + String api, boolean external) throws InvalidRecordException { + try { + String url = null; + Endpoint endpoint = getEndpoint(record, api, external); + List<String> addresses = + RegistryTypeUtils.retrieveAddressesUriType(endpoint); + if (addresses != null && !addresses.isEmpty()) { + url = addresses.get(0); + } + return url; + } catch (InvalidRecordException e) { + log.debug("looking for API {}", api, e); + return null; + } + } + + /** + * Get an endpont by API + * @param record service record + * @param api API + * @param external flag to indicate this is an external record + * @return the endpoint or null + */ + public static Endpoint getEndpoint(ServiceRecord record, + String api, + boolean external) { + return external ? record.getExternalEndpoint(api) + : record.getInternalEndpoint(api); + } + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/40ab068e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/Comparators.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/Comparators.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/Comparators.java new file mode 100644 index 0000000..9f0e5d4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/Comparators.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.utils; + +import java.io.Serializable; +import java.util.Comparator; + +/** + * Some general comparators + */ +public class Comparators { + + public static class LongComparator implements Comparator<Long>, Serializable { + @Override + public int compare(Long o1, Long o2) { + return o1.compareTo(o2); + } + } + + public static class InvertedLongComparator + implements Comparator<Long>, Serializable { + @Override + public int compare(Long o1, Long o2) { + return o2.compareTo(o1); + } + } + + /** + * Little template class to reverse any comparitor + * @param <CompareType> the type that is being compared + */ + public static class ComparatorReverser<CompareType> implements Comparator<CompareType>, + Serializable { + + final Comparator<CompareType> instance; + + public ComparatorReverser(Comparator<CompareType> instance) { + this.instance = instance; + } + + @Override + public int compare(CompareType first, CompareType second) { + return instance.compare(second, first); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/40ab068e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigHelper.java new file mode 100644 index 0000000..fe8cce8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigHelper.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.utils; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.service.exceptions.BadConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.StringWriter; +import java.net.URL; +import java.util.Map; + +/** + * Methods to aid in config, both in the Configuration class and + * with other parts of setting up Slider-initated processes. + * + * Some of the methods take an argument of a map iterable for their sources; this allows + * the same method + */ +public class ConfigHelper { + private static final Logger log = LoggerFactory.getLogger(ConfigHelper.class); + + /** + * Set an entire map full of values + * + * @param config config to patch + * @param map map of data + * @param origin origin data + */ + public static void addConfigMap(Configuration config, + Map<String, String> map, + String origin) throws BadConfigException { + addConfigMap(config, map.entrySet(), origin); + } + + /** + * Set an entire map full of values + * + * @param config config to patch + * @param map map of data + * @param origin origin data + */ + public static void addConfigMap(Configuration config, + Iterable<Map.Entry<String, String>> map, + String origin) throws BadConfigException { + for (Map.Entry<String, String> mapEntry : map) { + String key = mapEntry.getKey(); + String value = mapEntry.getValue(); + if (value == null) { + throw new BadConfigException("Null value for property " + key); + } + config.set(key, value, origin); + } + } + + /** + * Convert to an XML string + * @param conf configuration + * @return conf + * @throws IOException + */ + public static String toXml(Configuration conf) throws IOException { + StringWriter writer = new StringWriter(); + conf.writeXml(writer); + return writer.toString(); + } + + + /** + * Register a resource as a default resource. + * Do not attempt to use this unless you understand that the + * order in which default resources are loaded affects the outcome, + * and that subclasses of Configuration often register new default + * resources + * @param resource the resource name + * @return the URL or null + */ + public static URL registerDefaultResource(String resource) { + URL resURL = getResourceUrl(resource); + if (resURL != null) { + Configuration.addDefaultResource(resource); + } + return resURL; + } + + /** + * Load a configuration from a resource on this classpath. + * If the resource is not found, an empty configuration is returned + * @param resource the resource name + * @return the loaded configuration. + */ + public static Configuration loadFromResource(String resource) { + Configuration conf = new Configuration(false); + URL resURL = getResourceUrl(resource); + if (resURL != null) { + log.debug("loaded resources from {}", resURL); + conf.addResource(resource); + } else{ + log.debug("failed to find {} on the classpath", resource); + } + return conf; + + } + + /** + * Get the URL to a resource, null if not on the CP + * @param resource resource to look for + * @return the URL or null + */ + public static URL getResourceUrl(String resource) { + return ConfigHelper.class.getClassLoader() + .getResource(resource); + } + + /** + * This goes through the keyset of one configuration and retrieves each value + * from a value source -a different or the same configuration. This triggers + * the property resolution process of the value, resolving any variables against + * in-config or inherited configurations + * @param keysource source of keys + * @param valuesource the source of values + * @return a new configuration where <code>foreach key in keysource, get(key)==valuesource.get(key)</code> + */ + public static Configuration resolveConfiguration( + Iterable<Map.Entry<String, String>> keysource, + Configuration valuesource) { + Configuration result = new Configuration(false); + for (Map.Entry<String, String> entry : keysource) { + String key = entry.getKey(); + String value = valuesource.get(key); + Preconditions.checkState(value != null, + "no reference for \"%s\" in values", key); + result.set(key, value); + } + return result; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/40ab068e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigUtils.java new file mode 100644 index 0000000..a969be9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigUtils.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.utils; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.service.api.records.ConfigFormat; +import org.apache.hadoop.yarn.service.utils.SliderFileSystem; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class ConfigUtils { + public static final String TEMPLATE_FILE = "template.file"; + + public static String replaceProps(Map<String, String> config, String content) { + Map<String, String> tokens = new HashMap<>(); + for (Entry<String, String> entry : config.entrySet()) { + tokens.put("${" + entry.getKey() + "}", entry.getValue()); + tokens.put("{{" + entry.getKey() + "}}", entry.getValue()); + } + String value = content; + for (Map.Entry<String,String> token : tokens.entrySet()) { + value = value.replaceAll(Pattern.quote(token.getKey()), + Matcher.quoteReplacement(token.getValue())); + } + return value; + } + + public static Map<String, String> replacePropsInConfig( + Map<String, String> config, Map<String, String> env) { + Map<String, String> tokens = new HashMap<>(); + for (Entry<String, String> entry : env.entrySet()) { + tokens.put("${" + entry.getKey() + "}", entry.getValue()); + } + Map<String, String> newConfig = new HashMap<>(); + for (Entry<String, String> entry : config.entrySet()) { + String value = entry.getValue(); + for (Map.Entry<String,String> token : tokens.entrySet()) { + value = value.replaceAll(Pattern.quote(token.getKey()), + Matcher.quoteReplacement(token.getValue())); + } + newConfig.put(entry.getKey(), entry.getValue()); + } + return newConfig; + } + + public static void prepConfigForTemplateOutputter(ConfigFormat configFormat, + Map<String, String> config, SliderFileSystem fileSystem, + String clusterName, String fileName) throws IOException { + if (!configFormat.equals(ConfigFormat.TEMPLATE)) { + return; + } + Path templateFile = null; + if (config.containsKey(TEMPLATE_FILE)) { + templateFile = fileSystem.buildResourcePath(config.get(TEMPLATE_FILE)); + if (!fileSystem.isFile(templateFile)) { + templateFile = fileSystem.buildResourcePath(clusterName, + config.get(TEMPLATE_FILE)); + } + if (!fileSystem.isFile(templateFile)) { + throw new IOException("config specified template file " + config + .get(TEMPLATE_FILE) + " but " + templateFile + " doesn't exist"); + } + } + if (templateFile == null && fileName != null) { + templateFile = fileSystem.buildResourcePath(fileName); + if (!fileSystem.isFile(templateFile)) { + templateFile = fileSystem.buildResourcePath(clusterName, + fileName); + } + } + if (fileSystem.isFile(templateFile)) { + config.put("content", fileSystem.cat(templateFile)); + } else { + config.put("content", ""); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/40ab068e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java new file mode 100644 index 0000000..281e1dfe --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java @@ -0,0 +1,521 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.utils; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.VersionInfo; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.service.conf.SliderExitCodes; +import org.apache.hadoop.yarn.service.conf.YarnServiceConstants; +import org.apache.hadoop.yarn.service.conf.YarnServiceConf; +import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException; +import org.apache.hadoop.yarn.service.exceptions.ErrorStrings; +import org.apache.hadoop.yarn.service.exceptions.SliderException; +import org.apache.hadoop.yarn.util.Records; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class CoreFileSystem { + private static final Logger + log = LoggerFactory.getLogger(CoreFileSystem.class); + + private static final String UTF_8 = "UTF-8"; + + protected final FileSystem fileSystem; + protected final Configuration configuration; + + public CoreFileSystem(FileSystem fileSystem, Configuration configuration) { + Preconditions.checkNotNull(fileSystem, + "Cannot create a CoreFileSystem with a null FileSystem"); + Preconditions.checkNotNull(configuration, + "Cannot create a CoreFileSystem with a null Configuration"); + this.fileSystem = fileSystem; + this.configuration = configuration; + } + + public CoreFileSystem(Configuration configuration) throws IOException { + Preconditions.checkNotNull(configuration, + "Cannot create a CoreFileSystem with a null Configuration"); + this.fileSystem = FileSystem.get(configuration); + this.configuration = configuration; + } + + /** + * Get the temp path for this cluster + * @param clustername name of the cluster + * @return path for temp files (is not purged) + */ + public Path getTempPathForCluster(String clustername) { + Path clusterDir = buildClusterDirPath(clustername); + return new Path(clusterDir, YarnServiceConstants.TMP_DIR_PREFIX); + } + + /** + * Returns the underlying FileSystem for this object. + * + * @return filesystem + */ + public FileSystem getFileSystem() { + return fileSystem; + } + + @Override + public String toString() { + final StringBuilder sb = + new StringBuilder("CoreFileSystem{"); + sb.append("fileSystem=").append(fileSystem.getUri()); + sb.append('}'); + return sb.toString(); + } + + /** + * Build up the path string for a cluster instance -no attempt to + * create the directory is made + * + * @param clustername name of the cluster + * @return the path for persistent data + */ + public Path buildClusterDirPath(String clustername) { + Preconditions.checkNotNull(clustername); + Path path = getBaseApplicationPath(); + return new Path(path, YarnServiceConstants.SERVICES_DIRECTORY + "/" + clustername); + } + + + /** + * Build up the path string for keytab install location -no attempt to + * create the directory is made + * + * @return the path for keytab + */ + public Path buildKeytabInstallationDirPath(String keytabFolder) { + Preconditions.checkNotNull(keytabFolder); + Path path = getBaseApplicationPath(); + return new Path(path, YarnServiceConstants.KEYTAB_DIR + "/" + keytabFolder); + } + + /** + * Build up the path string for keytab install location -no attempt to + * create the directory is made + * + * @return the path for keytab installation location + */ + public Path buildKeytabPath(String keytabDir, String keytabName, String clusterName) { + Path homePath = getHomeDirectory(); + Path baseKeytabDir; + if (keytabDir != null) { + baseKeytabDir = new Path(homePath, keytabDir); + } else { + baseKeytabDir = new Path(buildClusterDirPath(clusterName), + YarnServiceConstants.KEYTAB_DIR); + } + return keytabName == null ? baseKeytabDir : + new Path(baseKeytabDir, keytabName); + } + + /** + * Build up the path string for resource install location -no attempt to + * create the directory is made + * + * @return the path for resource + */ + public Path buildResourcePath(String resourceFolder) { + Preconditions.checkNotNull(resourceFolder); + Path path = getBaseApplicationPath(); + return new Path(path, YarnServiceConstants.RESOURCE_DIR + "/" + resourceFolder); + } + + /** + * Build up the path string for resource install location -no attempt to + * create the directory is made + * + * @return the path for resource + */ + public Path buildResourcePath(String dirName, String fileName) { + Preconditions.checkNotNull(dirName); + Preconditions.checkNotNull(fileName); + Path path = getBaseApplicationPath(); + return new Path(path, YarnServiceConstants.RESOURCE_DIR + "/" + dirName + "/" + fileName); + } + + /** + * Create a directory with the given permissions. + * + * @param dir directory + * @param clusterPerms cluster permissions + * @throws IOException IO problem + * @throws BadClusterStateException any cluster state problem + */ + @SuppressWarnings("deprecation") + public void createWithPermissions(Path dir, FsPermission clusterPerms) throws + IOException, + BadClusterStateException { + if (fileSystem.isFile(dir)) { + // HADOOP-9361 shows some filesystems don't correctly fail here + throw new BadClusterStateException( + "Cannot create a directory over a file %s", dir); + } + log.debug("mkdir {} with perms {}", dir, clusterPerms); + //no mask whatoever + fileSystem.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000"); + fileSystem.mkdirs(dir, clusterPerms); + //and force set it anyway just to make sure + fileSystem.setPermission(dir, clusterPerms); + } + + /** + * Verify that the cluster directory is not present + * + * @param clustername name of the cluster + * @param clusterDirectory actual directory to look for + * @throws IOException trouble with FS + * @throws SliderException If the directory exists + */ + public void verifyClusterDirectoryNonexistent(String clustername, + Path clusterDirectory) + throws IOException, SliderException { + if (fileSystem.exists(clusterDirectory)) { + throw new SliderException(SliderExitCodes.EXIT_INSTANCE_EXISTS, + ErrorStrings.PRINTF_E_INSTANCE_ALREADY_EXISTS, clustername, + clusterDirectory); + } + } + /** + * Verify that the given directory is not present + * + * @param clusterDirectory actual directory to look for + * @throws IOException trouble with FS + * @throws SliderException If the directory exists + */ + public void verifyDirectoryNonexistent(Path clusterDirectory) throws + IOException, + SliderException { + if (fileSystem.exists(clusterDirectory)) { + + log.error("Dir {} exists: {}", + clusterDirectory, + listFSDir(clusterDirectory)); + throw new SliderException(SliderExitCodes.EXIT_INSTANCE_EXISTS, + ErrorStrings.PRINTF_E_INSTANCE_DIR_ALREADY_EXISTS, + clusterDirectory); + } + } + + /** + * Verify that a user has write access to a directory. + * It does this by creating then deleting a temp file + * + * @param dirPath actual directory to look for + * @throws FileNotFoundException file not found + * @throws IOException trouble with FS + * @throws BadClusterStateException if the directory is not writeable + */ + public void verifyDirectoryWriteAccess(Path dirPath) throws IOException, + SliderException { + verifyPathExists(dirPath); + Path tempFile = new Path(dirPath, "tmp-file-for-checks"); + try { + FSDataOutputStream out ; + out = fileSystem.create(tempFile, true); + IOUtils.closeStream(out); + fileSystem.delete(tempFile, false); + } catch (IOException e) { + log.warn("Failed to create file {}: {}", tempFile, e); + throw new BadClusterStateException(e, + "Unable to write to directory %s : %s", dirPath, e.toString()); + } + } + + /** + * Verify that a path exists + * @param path path to check + * @throws FileNotFoundException file not found + * @throws IOException trouble with FS + */ + public void verifyPathExists(Path path) throws IOException { + if (!fileSystem.exists(path)) { + throw new FileNotFoundException(path.toString()); + } + } + + /** + * Verify that a path exists + * @param path path to check + * @throws FileNotFoundException file not found or is not a file + * @throws IOException trouble with FS + */ + public void verifyFileExists(Path path) throws IOException { + FileStatus status = fileSystem.getFileStatus(path); + + if (!status.isFile()) { + throw new FileNotFoundException("Not a file: " + path.toString()); + } + } + + /** + * Given a path, check if it exists and is a file + * + * @param path + * absolute path to the file to check + * @return true if and only if path exists and is a file, false for all other + * reasons including if file check throws IOException + */ + public boolean isFile(Path path) { + boolean isFile = false; + try { + FileStatus status = fileSystem.getFileStatus(path); + if (status.isFile()) { + isFile = true; + } + } catch (IOException e) { + // ignore, isFile is already set to false + } + return isFile; + } + + /** + * Get the base path + * + * @return the base path optionally configured by + * {@link YarnServiceConf#YARN_SERVICE_BASE_PATH} + */ + public Path getBaseApplicationPath() { + String configuredBasePath = configuration + .get(YarnServiceConf.YARN_SERVICE_BASE_PATH, + getHomeDirectory() + "/" + YarnServiceConstants.SERVICE_BASE_DIRECTORY); + return new Path(configuredBasePath); + } + + /** + * Get slider dependency parent dir in HDFS + * + * @return the parent dir path of slider.tar.gz in HDFS + */ + public Path getDependencyPath() { + String parentDir = YarnServiceConstants.DEPENDENCY_DIR; + return new Path(String.format(parentDir, VersionInfo.getVersion())); + } + + /** + * Get slider.tar.gz absolute filepath in HDFS + * + * @return the absolute path to slider.tar.gz in HDFS + */ + public Path getDependencyTarGzip() { + Path dependencyLibAmPath = getDependencyPath(); + Path dependencyLibTarGzip = new Path( + dependencyLibAmPath.toUri().toString(), + YarnServiceConstants.DEPENDENCY_TAR_GZ_FILE_NAME + + YarnServiceConstants.DEPENDENCY_TAR_GZ_FILE_EXT); + return dependencyLibTarGzip; + } + + public Path getHomeDirectory() { + return fileSystem.getHomeDirectory(); + } + + /** + * Create an AM resource from the + * + * @param destPath dest path in filesystem + * @param resourceType resource type + * @return the local resource for AM + */ + public LocalResource createAmResource(Path destPath, LocalResourceType resourceType) throws IOException { + FileStatus destStatus = fileSystem.getFileStatus(destPath); + LocalResource amResource = Records.newRecord(LocalResource.class); + amResource.setType(resourceType); + // Set visibility of the resource + // Setting to most private option + amResource.setVisibility(LocalResourceVisibility.APPLICATION); + // Set the resource to be copied over + amResource.setResource( + URL.fromPath(fileSystem.resolvePath(destStatus.getPath()))); + // Set timestamp and length of file so that the framework + // can do basic sanity checks for the local resource + // after it has been copied over to ensure it is the same + // resource the client intended to use with the service + amResource.setTimestamp(destStatus.getModificationTime()); + amResource.setSize(destStatus.getLen()); + return amResource; + } + + /** + * Register all files under a fs path as a directory to push out + * + * @param srcDir src dir + * @param destRelativeDir dest dir (no trailing /) + * @return the map of entries + */ + public Map<String, LocalResource> submitDirectory(Path srcDir, String destRelativeDir) throws IOException { + //now register each of the files in the directory to be + //copied to the destination + FileStatus[] fileset = fileSystem.listStatus(srcDir); + Map<String, LocalResource> localResources = + new HashMap<String, LocalResource>(fileset.length); + for (FileStatus entry : fileset) { + + LocalResource resource = createAmResource(entry.getPath(), + LocalResourceType.FILE); + String relativePath = destRelativeDir + "/" + entry.getPath().getName(); + localResources.put(relativePath, resource); + } + return localResources; + } + + /** + * Submit a JAR containing a specific class, returning + * the resource to be mapped in + * + * @param clazz class to look for + * @param subdir subdirectory (expected to end in a "/") + * @param jarName <i>At the destination</i> + * @return the local resource ref + * @throws IOException trouble copying to HDFS + */ + public LocalResource submitJarWithClass(Class clazz, Path tempPath, String subdir, String jarName) + throws IOException, SliderException { + File localFile = SliderUtils.findContainingJarOrFail(clazz); + return submitFile(localFile, tempPath, subdir, jarName); + } + + /** + * Submit a local file to the filesystem references by the instance's cluster + * filesystem + * + * @param localFile filename + * @param subdir subdirectory (expected to end in a "/") + * @param destFileName destination filename + * @return the local resource ref + * @throws IOException trouble copying to HDFS + */ + public LocalResource submitFile(File localFile, Path tempPath, String subdir, String destFileName) + throws IOException { + Path src = new Path(localFile.toString()); + Path subdirPath = new Path(tempPath, subdir); + fileSystem.mkdirs(subdirPath); + Path destPath = new Path(subdirPath, destFileName); + log.debug("Copying {} (size={} bytes) to {}", localFile, localFile.length(), destPath); + + fileSystem.copyFromLocalFile(false, true, src, destPath); + + // Set the type of resource - file or archive + // archives are untarred at destination + // we don't need the jar file to be untarred for now + return createAmResource(destPath, LocalResourceType.FILE); + } + + /** + * Submit the AM tar.gz resource referenced by the instance's cluster + * filesystem. Also, update the providerResources object with the new + * resource. + * + * @param providerResources + * the provider resource map to be updated + * @throws IOException + * trouble copying to HDFS + */ + public void submitTarGzipAndUpdate( + Map<String, LocalResource> providerResources) throws IOException, + BadClusterStateException { + Path dependencyLibTarGzip = getDependencyTarGzip(); + LocalResource lc = createAmResource(dependencyLibTarGzip, + LocalResourceType.ARCHIVE); + providerResources.put(YarnServiceConstants.DEPENDENCY_LOCALIZED_DIR_LINK, lc); + } + + public void copyLocalFileToHdfs(File localPath, + Path destPath, FsPermission fp) + throws IOException { + if (localPath == null || destPath == null) { + throw new IOException("Either localPath or destPath is null"); + } + fileSystem.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, + "000"); + fileSystem.mkdirs(destPath.getParent(), fp); + log.info("Copying file {} to {}", localPath.toURI(), + fileSystem.getScheme() + ":/" + destPath.toUri()); + + fileSystem.copyFromLocalFile(false, true, new Path(localPath.getPath()), + destPath); + // set file permissions of the destPath + fileSystem.setPermission(destPath, fp); + } + + public void copyHdfsFileToLocal(Path hdfsPath, File destFile) + throws IOException { + if (hdfsPath == null || destFile == null) { + throw new IOException("Either hdfsPath or destPath is null"); + } + log.info("Copying file {} to {}", hdfsPath.toUri(), destFile.toURI()); + + Path destPath = new Path(destFile.getPath()); + fileSystem.copyToLocalFile(hdfsPath, destPath); + } + + /** + * list entries in a filesystem directory + * + * @param path directory + * @return a listing, one to a line + * @throws IOException + */ + public String listFSDir(Path path) throws IOException { + FileStatus[] stats = fileSystem.listStatus(path); + StringBuilder builder = new StringBuilder(); + for (FileStatus stat : stats) { + builder.append(stat.getPath().toString()) + .append("\t") + .append(stat.getLen()) + .append("\n"); + } + return builder.toString(); + } + + public String cat(Path path) throws IOException { + FileStatus status = fileSystem.getFileStatus(path); + byte[] b = new byte[(int) status.getLen()]; + FSDataInputStream in = null; + try { + in = fileSystem.open(path); + int count = in.read(b); + return new String(b, 0, count, UTF_8); + } finally { + IOUtils.closeStream(in); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/40ab068e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/Duration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/Duration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/Duration.java new file mode 100644 index 0000000..6fadfd3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/Duration.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.utils; + +import java.io.Closeable; + +/** + * A duration in milliseconds. This class can be used + * to count time, and to be polled to see if a time limit has + * passed. + */ +public class Duration implements Closeable { + public long start, finish; + public final long limit; + + /** + * Create a duration instance with a limit of 0 + */ + public Duration() { + this(0); + } + + /** + * Create a duration with a limit specified in millis + * @param limit duration in milliseconds + */ + public Duration(long limit) { + this.limit = limit; + } + + /** + * Start + * @return self + */ + public Duration start() { + start = now(); + return this; + } + + /** + * The close operation relays to {@link #finish()}. + * Implementing it allows Duration instances to be automatically + * finish()'d in Java7 try blocks for when used in measuring durations. + */ + @Override + public final void close() { + finish(); + } + + public void finish() { + finish = now(); + } + + protected long now() { + return System.nanoTime()/1000000; + } + + public long getInterval() { + return finish - start; + } + + /** + * return true if the limit has been exceeded + * @return true if a limit was set and the current time + * exceeds it. + */ + public boolean getLimitExceeded() { + return limit >= 0 && ((now() - start) > limit); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("Duration"); + if (finish >= start) { + builder.append(" finished at ").append(getInterval()).append(" millis;"); + } else { + if (start > 0) { + builder.append(" started but not yet finished;"); + } else { + builder.append(" unstarted;"); + } + } + if (limit > 0) { + builder.append(" limit: ").append(limit).append(" millis"); + if (getLimitExceeded()) { + builder.append(" - exceeded"); + } + } + return builder.toString(); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org