http://git-wip-us.apache.org/repos/asf/hadoop/blob/394183c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/servicemonitor/probe/Probe.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/servicemonitor/probe/Probe.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/servicemonitor/probe/Probe.java new file mode 100644 index 0000000..b851fb7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/servicemonitor/probe/Probe.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.servicemonitor.probe; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.service.compinstance.ComponentInstance; + +import java.io.IOException; +import java.util.Map; + +/** + * Base class of all probes. + */ +public abstract class Probe implements MonitorKeys { + + protected final Configuration conf; + private String name; + + /** + * Create a probe of a specific name + * + * @param name probe name + * @param conf configuration being stored. + */ + public Probe(String name, Configuration conf) { + this.name = name; + this.conf = conf; + } + + + protected void setName(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + + @Override + public String toString() { + return getName(); + } + + public static String getProperty(Map<String, String> props, String name, + String defaultValue) throws IOException { + String value = props.get(name); + if (StringUtils.isEmpty(value)) { + if (defaultValue == null) { + throw new IOException(name + " not specified"); + } + return defaultValue; + } + return value; + } + + public static int getPropertyInt(Map<String, String> props, String name, + Integer defaultValue) throws IOException { + String value = props.get(name); + if (StringUtils.isEmpty(value)) { + if (defaultValue == null) { + throw new IOException(name + " not specified"); + } + return defaultValue; + } + return Integer.parseInt(value); + } + + /** + * perform any prelaunch initialization + */ + public void init() throws IOException { + + } + + /** + * Ping the endpoint. All exceptions must be caught and included in the + * (failure) status. + * + * @param instance instance to ping + * @return the status + */ + public abstract ProbeStatus ping(ComponentInstance instance); +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/394183c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/servicemonitor/probe/ProbeStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/servicemonitor/probe/ProbeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/servicemonitor/probe/ProbeStatus.java new file mode 100644 index 0000000..7cd761c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/servicemonitor/probe/ProbeStatus.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.servicemonitor.probe; + +import java.io.Serializable; +import java.util.Date; + +/** + * Status message of a probe. This is designed to be sent over the wire, though the exception + * Had better be unserializable at the far end if that is to work. + */ +public final class ProbeStatus implements Serializable { + private static final long serialVersionUID = 165468L; + + private long timestamp; + private String timestampText; + private boolean success; + private boolean realOutcome; + private String message; + private Throwable thrown; + private transient Probe originator; + + public ProbeStatus() { + } + + public ProbeStatus(long timestamp, String message, Throwable thrown) { + this.success = false; + this.message = message; + this.thrown = thrown; + setTimestamp(timestamp); + } + + public ProbeStatus(long timestamp, String message) { + this.success = true; + setTimestamp(timestamp); + this.message = message; + this.thrown = null; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + timestampText = new Date(timestamp).toString(); + } + + public boolean isSuccess() { + return success; + } + + /** + * Set both the success and the real outcome bits to the same value + * @param success the new value + */ + public void setSuccess(boolean success) { + this.success = success; + realOutcome = success; + } + + public String getTimestampText() { + return timestampText; + } + + public boolean getRealOutcome() { + return realOutcome; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public Throwable getThrown() { + return thrown; + } + + public void setThrown(Throwable thrown) { + this.thrown = thrown; + } + + /** + * Get the probe that generated this result. May be null + * @return a possibly null reference to a probe + */ + public Probe getOriginator() { + return originator; + } + + /** + * The probe has succeeded -capture the current timestamp, set + * success to true, and record any other data needed. + * @param probe probe + */ + public void succeed(Probe probe) { + finish(probe, true, probe.getName(), null); + } + + /** + * A probe has failed either because the test returned false, or an exception + * was thrown. The {@link #success} field is set to false, any exception + * thrown is recorded. + * @param probe probe that failed + * @param thrown an exception that was thrown. + */ + public void fail(Probe probe, Throwable thrown) { + finish(probe, false, "Failure in " + probe, thrown); + } + + public void finish(Probe probe, boolean succeeded, String text, Throwable thrown) { + setTimestamp(System.currentTimeMillis()); + setSuccess(succeeded); + originator = probe; + message = text; + this.thrown = thrown; + } + + @Override + public String toString() { + LogEntryBuilder builder = new LogEntryBuilder("Probe Status"); + builder.elt("time", timestampText) + .elt("outcome", (success ? "success" : "failure")); + + if (success != realOutcome) { + builder.elt("originaloutcome", (realOutcome ? "success" : "failure")); + } + builder.elt("message", message); + if (thrown != null) { + builder.elt("exception", thrown); + } + + return builder.toString(); + } + + /** + * Flip the success bit on while the real outcome bit is kept false + */ + public void markAsSuccessful() { + success = true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/394183c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineMetricsConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineMetricsConstants.java index 4f39921..78a7171 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineMetricsConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineMetricsConstants.java @@ -84,9 +84,6 @@ public final class ServiceTimelineMetricsConstants { public static final String DESCRIPTION = "DESCRIPTION"; - public static final String UNIQUE_COMPONENT_SUPPORT = - "UNIQUE_COMPONENT_SUPPORT"; - public static final String RUN_PRIVILEGED_CONTAINER = "RUN_PRIVILEGED_CONTAINER"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/394183c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java index f115063..243baea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java @@ -21,24 +21,19 @@ package org.apache.hadoop.yarn.service.timelineservice; import org.apache.hadoop.metrics2.AbstractMetric; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.client.api.TimelineV2Client; -import org.apache.hadoop.yarn.util.timeline.TimelineUtils; -import org.apache.slider.api.resource.Application; -import org.apache.slider.api.resource.Component; -import org.apache.slider.api.resource.ConfigFile; -import org.apache.slider.api.resource.Configuration; -import org.apache.slider.api.resource.Container; -import org.apache.slider.common.tools.SliderUtils; -import org.apache.slider.server.appmaster.actions.ActionStopSlider; -import org.apache.hadoop.yarn.service.compinstance.ComponentInstance; import org.apache.hadoop.yarn.service.ServiceContext; -import org.apache.slider.server.appmaster.state.AppState; -import org.apache.slider.server.appmaster.state.RoleInstance; +import org.apache.hadoop.yarn.service.api.records.Application; +import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.ConfigFile; +import org.apache.hadoop.yarn.service.api.records.Configuration; +import org.apache.hadoop.yarn.service.api.records.Container; +import org.apache.hadoop.yarn.service.compinstance.ComponentInstance; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +45,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import static org.apache.hadoop.yarn.service.timelineservice.ServiceTimelineMetricsConstants.DIAGNOSTICS_INFO; + /** * A single service that publishes all the Timeline Entities. */ @@ -87,7 +84,8 @@ public class ServiceTimelinePublisher extends CompositeService { timelineClient = client; } - public void serviceAttemptRegistered(Application application) { + public void serviceAttemptRegistered(Application application, + org.apache.hadoop.conf.Configuration systemConf) { long currentTimeMillis = application.getLaunchTime() == null ? System.currentTimeMillis() : application.getLaunchTime().getTime(); @@ -114,9 +112,12 @@ public class ServiceTimelinePublisher extends CompositeService { // publish before configurations published putEntity(entity); - // publish application specific configurations - publishConfigurations(application.getConfiguration(), application.getId(), - ServiceTimelineEntityType.SERVICE_ATTEMPT.toString(), true); + // publish system config - YarnConfiguration + populateTimelineEntity(systemConf.iterator(), application.getId(), + ServiceTimelineEntityType.SERVICE_ATTEMPT.toString()); + // publish user conf + publishUserConf(application.getConfiguration(), application.getId(), + ServiceTimelineEntityType.SERVICE_ATTEMPT.toString()); // publish component as separate entity. publishComponents(application.getComponents()); @@ -129,12 +130,14 @@ public class ServiceTimelinePublisher extends CompositeService { putEntity(entity); } - public void serviceAttemptUnregistered(ServiceContext context) { + public void serviceAttemptUnregistered(ServiceContext context, + String diagnostics) { TimelineEntity entity = createServiceAttemptEntity( context.attemptId.getApplicationId().toString()); Map<String, Object> entityInfos = new HashMap<String, Object>(); entityInfos.put(ServiceTimelineMetricsConstants.STATE, - FinalApplicationStatus.FAILED); + FinalApplicationStatus.ENDED); + entityInfos.put(DIAGNOSTICS_INFO, diagnostics); entity.addInfo(entityInfos); // add an event @@ -147,39 +150,6 @@ public class ServiceTimelinePublisher extends CompositeService { putEntity(entity); } - public void serviceAttemptUnregistered(AppState appState, - ActionStopSlider stopAction) { - long currentTimeMillis = System.currentTimeMillis(); - - TimelineEntity entity = - createServiceAttemptEntity(appState.getClusterStatus().getId()); - - // add info - Map<String, Object> entityInfos = new HashMap<String, Object>(); - entityInfos.put(ServiceTimelineMetricsConstants.EXIT_STATUS_CODE, - stopAction.getExitCode()); - entityInfos.put(ServiceTimelineMetricsConstants.STATE, - stopAction.getFinalApplicationStatus().toString()); - if (stopAction.getMessage() != null) { - entityInfos.put(ServiceTimelineMetricsConstants.EXIT_REASON, - stopAction.getMessage()); - } - if (stopAction.getEx() != null) { - entityInfos.put(ServiceTimelineMetricsConstants.DIAGNOSTICS_INFO, - stopAction.getEx().toString()); - } - entity.addInfo(entityInfos); - - // add an event - TimelineEvent startEvent = new TimelineEvent(); - startEvent - .setId(ServiceTimelineEvent.SERVICE_ATTEMPT_UNREGISTERED.toString()); - startEvent.setTimestamp(currentTimeMillis); - entity.addEvent(startEvent); - - putEntity(entity); - } - public void componentInstanceStarted(Container container, ComponentInstance instance) { @@ -210,29 +180,6 @@ public class ServiceTimelinePublisher extends CompositeService { putEntity(entity); } - public void componentInstanceFinished(RoleInstance instance) { - TimelineEntity entity = createComponentInstanceEntity(instance.id); - - // create info keys - Map<String, Object> entityInfos = new HashMap<String, Object>(); - entityInfos.put(ServiceTimelineMetricsConstants.EXIT_STATUS_CODE, - instance.exitCode); - entityInfos.put(ServiceTimelineMetricsConstants.DIAGNOSTICS_INFO, - instance.diagnostics); - // TODO need to change the state based on enum value. - entityInfos.put(ServiceTimelineMetricsConstants.STATE, "FINISHED"); - entity.addInfo(entityInfos); - - // add an event - TimelineEvent startEvent = new TimelineEvent(); - startEvent - .setId(ServiceTimelineEvent.COMPONENT_INSTANCE_UNREGISTERED.toString()); - startEvent.setTimestamp(System.currentTimeMillis()); - entity.addEvent(startEvent); - - putEntity(entity); - } - public void componentInstanceFinished(ComponentInstance instance, int exitCode, ContainerState state, String diagnostics) { TimelineEntity entity = createComponentInstanceEntity( @@ -242,7 +189,7 @@ public class ServiceTimelinePublisher extends CompositeService { Map<String, Object> entityInfos = new HashMap<String, Object>(); entityInfos.put(ServiceTimelineMetricsConstants.EXIT_STATUS_CODE, exitCode); - entityInfos.put(ServiceTimelineMetricsConstants.DIAGNOSTICS_INFO, diagnostics); + entityInfos.put(DIAGNOSTICS_INFO, diagnostics); entityInfos.put(ServiceTimelineMetricsConstants.STATE, state); entity.addInfo(entityInfos); @@ -302,8 +249,6 @@ public class ServiceTimelinePublisher extends CompositeService { entityInfos.put(ServiceTimelineMetricsConstants.LAUNCH_COMMAND, component.getLaunchCommand()); } - entityInfos.put(ServiceTimelineMetricsConstants.UNIQUE_COMPONENT_SUPPORT, - component.getUniqueComponentSupport().toString()); entityInfos.put(ServiceTimelineMetricsConstants.RUN_PRIVILEGED_CONTAINER, component.getRunPrivilegedContainer().toString()); if (component.getPlacementPolicy() != null) { @@ -315,31 +260,26 @@ public class ServiceTimelinePublisher extends CompositeService { putEntity(entity); // publish component specific configurations - publishConfigurations(component.getConfiguration(), component.getName(), - ServiceTimelineEntityType.COMPONENT.toString(), false); + publishUserConf(component.getConfiguration(), component.getName(), + ServiceTimelineEntityType.COMPONENT.toString()); } } - private void publishConfigurations(Configuration configuration, - String entityId, String entityType, boolean isServiceAttemptEntity) { - if (isServiceAttemptEntity) { - // publish slider-client.xml properties at service level - publishConfigurations(SliderUtils.loadSliderClientXML().iterator(), - entityId, entityType); - } - publishConfigurations(configuration.getProperties().entrySet().iterator(), + private void publishUserConf(Configuration configuration, + String entityId, String entityType) { + populateTimelineEntity(configuration.getProperties().entrySet().iterator(), entityId, entityType); - publishConfigurations(configuration.getEnv().entrySet().iterator(), + populateTimelineEntity(configuration.getEnv().entrySet().iterator(), entityId, entityType); for (ConfigFile configFile : configuration.getFiles()) { - publishConfigurations(configFile.getProps().entrySet().iterator(), + populateTimelineEntity(configFile.getProps().entrySet().iterator(), entityId, entityType); } } - private void publishConfigurations(Iterator<Entry<String, String>> iterator, + private void populateTimelineEntity(Iterator<Entry<String, String>> iterator, String entityId, String entityType) { int configSize = 0; TimelineEntity entity = createTimelineEntity(entityId, entityType); http://git-wip-us.apache.org/repos/asf/hadoop/blob/394183c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ApplicationReportSerDeser.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ApplicationReportSerDeser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ApplicationReportSerDeser.java new file mode 100644 index 0000000..2607c08 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ApplicationReportSerDeser.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.utils; + +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; + +import java.io.IOException; + +/** + * Persistence of {@link SerializedApplicationReport} + * + */ +public class ApplicationReportSerDeser + extends JsonSerDeser<SerializedApplicationReport> { + public ApplicationReportSerDeser() { + super(SerializedApplicationReport.class); + } + + + private static final ApplicationReportSerDeser + staticinstance = new ApplicationReportSerDeser(); + + /** + * Convert an instance to a JSON string -sync access to a shared ser/deser + * object instance + * @param instance object to convert + * @return a JSON string description + * @throws JsonParseException parse problems + * @throws JsonMappingException O/J mapping problems + */ + public static String toString(SerializedApplicationReport instance) + throws IOException, JsonGenerationException, JsonMappingException { + synchronized (staticinstance) { + return staticinstance.toJson(instance); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/394183c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ClientRegistryBinder.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ClientRegistryBinder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ClientRegistryBinder.java new file mode 100644 index 0000000..86896b2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ClientRegistryBinder.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.utils; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.PathNotFoundException; +import org.apache.hadoop.registry.client.api.RegistryConstants; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.registry.client.exceptions.InvalidRecordException; +import org.apache.hadoop.registry.client.impl.zk.RegistryInternalConstants; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +import static org.apache.hadoop.registry.client.binding.RegistryPathUtils.encodeForRegistry; +import static org.apache.hadoop.registry.client.binding.RegistryUtils.convertUsername; +import static org.apache.hadoop.registry.client.binding.RegistryUtils.getCurrentUsernameUnencoded; +import static org.apache.hadoop.registry.client.binding.RegistryUtils.servicePath; + +/** + * Generic code to get the URLs for clients via the registry + */ +public class ClientRegistryBinder { + private static final Logger log = + LoggerFactory.getLogger(ClientRegistryBinder.class); + + private final RegistryOperations operations; + + public ClientRegistryBinder(RegistryOperations operations) { + this.operations = operations; + } + + /** + * Buld the user path -switches to the system path if the user is "". + * It also cross-converts the username to ascii via punycode + * @param username username or "" + * @return the path to the user + */ + public static String homePathForUser(String username) { + Preconditions.checkArgument(username != null, "null user"); + + // catch recursion + if (username.startsWith(RegistryConstants.PATH_USERS)) { + return username; + } + + if (username.isEmpty()) { + return RegistryConstants.PATH_SYSTEM_SERVICES; + } + + // convert username to registry name + String convertedName = convertUsername(username); + + return RegistryPathUtils.join(RegistryConstants.PATH_USERS, + encodeForRegistry(convertedName)); + } + + /** + * Get the current username, before any encoding has been applied. + * @return the current user from the kerberos identity, falling back + * to the user and/or env variables. + */ + public static String currentUsernameUnencoded() { + String env_hadoop_username = System.getenv( + RegistryInternalConstants.HADOOP_USER_NAME); + return getCurrentUsernameUnencoded(env_hadoop_username); + } + + /** + * Qualify a user. + * <ol> + * <li> <code>"~"</code> maps to user home path home</li> + * <li> <code>"~user"</code> maps to <code>/users/$user</code></li> + * <li> <code>"/"</code> maps to <code>/services/</code></li> + * </ol> + * @param user the username + * @return the base path + */ + public static String qualifyUser(String user) { + // qualify the user + String t = user.trim(); + if (t.startsWith("/")) { + // already resolved + return t; + } else if (t.equals("~")) { + // self + return currentUsernameUnencoded(); + } else if (t.startsWith("~")) { + // another user + // convert username to registry name + String convertedName = convertUsername(t.substring(1)); + + return RegistryPathUtils.join(RegistryConstants.PATH_USERS, + encodeForRegistry(convertedName)); + } else { + return "/" + t; + } + } + + /** + * Look up an external REST API + * @param user user which will be qualified as per {@link #qualifyUser(String)} + * @param serviceClass service class + * @param instance instance name + * @param api API + * @return the API, or an exception is raised. + * @throws IOException + */ + public String lookupExternalRestAPI(String user, + String serviceClass, + String instance, + String api) + throws IOException { + String qualified = qualifyUser(user); + String path = servicePath(qualified, serviceClass, instance); + String restAPI = resolveExternalRestAPI(api, path); + if (restAPI == null) { + throw new PathNotFoundException(path + " API " + api); + } + return restAPI; + } + + /** + * Resolve a service record then return an external REST API exported it. + * + * @param api API to resolve + * @param path path of the service record + * @return null if the record exists but the API is absent or it has no + * REST endpoints. + * @throws IOException resolution problems, as covered in + * {@link RegistryOperations#resolve(String)} + */ + protected String resolveExternalRestAPI(String api, String path) throws + IOException { + ServiceRecord record = operations.resolve(path); + return lookupRestAPI(record, api, true); + } + + /** + * Look up an external REST API endpoint + * @param record service record + * @param api URI of api + * @param external flag to indicate this is an external record + * @return the first endpoint of the implementation, or null if there + * is no entry for the API, implementation or it's the wrong type. + */ + public static String lookupRestAPI(ServiceRecord record, + String api, boolean external) throws InvalidRecordException { + try { + String url = null; + Endpoint endpoint = getEndpoint(record, api, external); + List<String> addresses = + RegistryTypeUtils.retrieveAddressesUriType(endpoint); + if (addresses != null && !addresses.isEmpty()) { + url = addresses.get(0); + } + return url; + } catch (InvalidRecordException e) { + log.debug("looking for API {}", api, e); + return null; + } + } + + /** + * Get an endpont by API + * @param record service record + * @param api API + * @param external flag to indicate this is an external record + * @return the endpoint or null + */ + public static Endpoint getEndpoint(ServiceRecord record, + String api, + boolean external) { + return external ? record.getExternalEndpoint(api) + : record.getInternalEndpoint(api); + } + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/394183c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/Comparators.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/Comparators.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/Comparators.java new file mode 100644 index 0000000..9f0e5d4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/Comparators.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.utils; + +import java.io.Serializable; +import java.util.Comparator; + +/** + * Some general comparators + */ +public class Comparators { + + public static class LongComparator implements Comparator<Long>, Serializable { + @Override + public int compare(Long o1, Long o2) { + return o1.compareTo(o2); + } + } + + public static class InvertedLongComparator + implements Comparator<Long>, Serializable { + @Override + public int compare(Long o1, Long o2) { + return o2.compareTo(o1); + } + } + + /** + * Little template class to reverse any comparitor + * @param <CompareType> the type that is being compared + */ + public static class ComparatorReverser<CompareType> implements Comparator<CompareType>, + Serializable { + + final Comparator<CompareType> instance; + + public ComparatorReverser(Comparator<CompareType> instance) { + this.instance = instance; + } + + @Override + public int compare(CompareType first, CompareType second) { + return instance.compare(second, first); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/394183c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigHelper.java new file mode 100644 index 0000000..fe8cce8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigHelper.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.utils; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.service.exceptions.BadConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.StringWriter; +import java.net.URL; +import java.util.Map; + +/** + * Methods to aid in config, both in the Configuration class and + * with other parts of setting up Slider-initated processes. + * + * Some of the methods take an argument of a map iterable for their sources; this allows + * the same method + */ +public class ConfigHelper { + private static final Logger log = LoggerFactory.getLogger(ConfigHelper.class); + + /** + * Set an entire map full of values + * + * @param config config to patch + * @param map map of data + * @param origin origin data + */ + public static void addConfigMap(Configuration config, + Map<String, String> map, + String origin) throws BadConfigException { + addConfigMap(config, map.entrySet(), origin); + } + + /** + * Set an entire map full of values + * + * @param config config to patch + * @param map map of data + * @param origin origin data + */ + public static void addConfigMap(Configuration config, + Iterable<Map.Entry<String, String>> map, + String origin) throws BadConfigException { + for (Map.Entry<String, String> mapEntry : map) { + String key = mapEntry.getKey(); + String value = mapEntry.getValue(); + if (value == null) { + throw new BadConfigException("Null value for property " + key); + } + config.set(key, value, origin); + } + } + + /** + * Convert to an XML string + * @param conf configuration + * @return conf + * @throws IOException + */ + public static String toXml(Configuration conf) throws IOException { + StringWriter writer = new StringWriter(); + conf.writeXml(writer); + return writer.toString(); + } + + + /** + * Register a resource as a default resource. + * Do not attempt to use this unless you understand that the + * order in which default resources are loaded affects the outcome, + * and that subclasses of Configuration often register new default + * resources + * @param resource the resource name + * @return the URL or null + */ + public static URL registerDefaultResource(String resource) { + URL resURL = getResourceUrl(resource); + if (resURL != null) { + Configuration.addDefaultResource(resource); + } + return resURL; + } + + /** + * Load a configuration from a resource on this classpath. + * If the resource is not found, an empty configuration is returned + * @param resource the resource name + * @return the loaded configuration. + */ + public static Configuration loadFromResource(String resource) { + Configuration conf = new Configuration(false); + URL resURL = getResourceUrl(resource); + if (resURL != null) { + log.debug("loaded resources from {}", resURL); + conf.addResource(resource); + } else{ + log.debug("failed to find {} on the classpath", resource); + } + return conf; + + } + + /** + * Get the URL to a resource, null if not on the CP + * @param resource resource to look for + * @return the URL or null + */ + public static URL getResourceUrl(String resource) { + return ConfigHelper.class.getClassLoader() + .getResource(resource); + } + + /** + * This goes through the keyset of one configuration and retrieves each value + * from a value source -a different or the same configuration. This triggers + * the property resolution process of the value, resolving any variables against + * in-config or inherited configurations + * @param keysource source of keys + * @param valuesource the source of values + * @return a new configuration where <code>foreach key in keysource, get(key)==valuesource.get(key)</code> + */ + public static Configuration resolveConfiguration( + Iterable<Map.Entry<String, String>> keysource, + Configuration valuesource) { + Configuration result = new Configuration(false); + for (Map.Entry<String, String> entry : keysource) { + String key = entry.getKey(); + String value = valuesource.get(key); + Preconditions.checkState(value != null, + "no reference for \"%s\" in values", key); + result.set(key, value); + } + return result; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/394183c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigUtils.java new file mode 100644 index 0000000..a969be9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigUtils.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.utils; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.service.api.records.ConfigFormat; +import org.apache.hadoop.yarn.service.utils.SliderFileSystem; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class ConfigUtils { + public static final String TEMPLATE_FILE = "template.file"; + + public static String replaceProps(Map<String, String> config, String content) { + Map<String, String> tokens = new HashMap<>(); + for (Entry<String, String> entry : config.entrySet()) { + tokens.put("${" + entry.getKey() + "}", entry.getValue()); + tokens.put("{{" + entry.getKey() + "}}", entry.getValue()); + } + String value = content; + for (Map.Entry<String,String> token : tokens.entrySet()) { + value = value.replaceAll(Pattern.quote(token.getKey()), + Matcher.quoteReplacement(token.getValue())); + } + return value; + } + + public static Map<String, String> replacePropsInConfig( + Map<String, String> config, Map<String, String> env) { + Map<String, String> tokens = new HashMap<>(); + for (Entry<String, String> entry : env.entrySet()) { + tokens.put("${" + entry.getKey() + "}", entry.getValue()); + } + Map<String, String> newConfig = new HashMap<>(); + for (Entry<String, String> entry : config.entrySet()) { + String value = entry.getValue(); + for (Map.Entry<String,String> token : tokens.entrySet()) { + value = value.replaceAll(Pattern.quote(token.getKey()), + Matcher.quoteReplacement(token.getValue())); + } + newConfig.put(entry.getKey(), entry.getValue()); + } + return newConfig; + } + + public static void prepConfigForTemplateOutputter(ConfigFormat configFormat, + Map<String, String> config, SliderFileSystem fileSystem, + String clusterName, String fileName) throws IOException { + if (!configFormat.equals(ConfigFormat.TEMPLATE)) { + return; + } + Path templateFile = null; + if (config.containsKey(TEMPLATE_FILE)) { + templateFile = fileSystem.buildResourcePath(config.get(TEMPLATE_FILE)); + if (!fileSystem.isFile(templateFile)) { + templateFile = fileSystem.buildResourcePath(clusterName, + config.get(TEMPLATE_FILE)); + } + if (!fileSystem.isFile(templateFile)) { + throw new IOException("config specified template file " + config + .get(TEMPLATE_FILE) + " but " + templateFile + " doesn't exist"); + } + } + if (templateFile == null && fileName != null) { + templateFile = fileSystem.buildResourcePath(fileName); + if (!fileSystem.isFile(templateFile)) { + templateFile = fileSystem.buildResourcePath(clusterName, + fileName); + } + } + if (fileSystem.isFile(templateFile)) { + config.put("content", fileSystem.cat(templateFile)); + } else { + config.put("content", ""); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/394183c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java new file mode 100644 index 0000000..fa3b402 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java @@ -0,0 +1,521 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.utils; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.VersionInfo; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.service.conf.SliderExitCodes; +import org.apache.hadoop.yarn.service.conf.YarnServiceConstants; +import org.apache.hadoop.yarn.service.conf.YarnServiceConf; +import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException; +import org.apache.hadoop.yarn.service.exceptions.ErrorStrings; +import org.apache.hadoop.yarn.service.exceptions.SliderException; +import org.apache.hadoop.yarn.util.Records; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class CoreFileSystem { + private static final Logger + log = LoggerFactory.getLogger(CoreFileSystem.class); + + private static final String UTF_8 = "UTF-8"; + + protected final FileSystem fileSystem; + protected final Configuration configuration; + + public CoreFileSystem(FileSystem fileSystem, Configuration configuration) { + Preconditions.checkNotNull(fileSystem, + "Cannot create a CoreFileSystem with a null FileSystem"); + Preconditions.checkNotNull(configuration, + "Cannot create a CoreFileSystem with a null Configuration"); + this.fileSystem = fileSystem; + this.configuration = configuration; + } + + public CoreFileSystem(Configuration configuration) throws IOException { + Preconditions.checkNotNull(configuration, + "Cannot create a CoreFileSystem with a null Configuration"); + this.fileSystem = FileSystem.get(configuration); + this.configuration = configuration; + } + + /** + * Get the temp path for this cluster + * @param clustername name of the cluster + * @return path for temp files (is not purged) + */ + public Path getTempPathForCluster(String clustername) { + Path clusterDir = buildClusterDirPath(clustername); + return new Path(clusterDir, YarnServiceConstants.TMP_DIR_PREFIX); + } + + /** + * Returns the underlying FileSystem for this object. + * + * @return filesystem + */ + public FileSystem getFileSystem() { + return fileSystem; + } + + @Override + public String toString() { + final StringBuilder sb = + new StringBuilder("CoreFileSystem{"); + sb.append("fileSystem=").append(fileSystem.getUri()); + sb.append('}'); + return sb.toString(); + } + + /** + * Build up the path string for a cluster instance -no attempt to + * create the directory is made + * + * @param clustername name of the cluster + * @return the path for persistent data + */ + public Path buildClusterDirPath(String clustername) { + Preconditions.checkNotNull(clustername); + Path path = getBaseApplicationPath(); + return new Path(path, YarnServiceConstants.SERVICES_DIRECTORY + "/" + clustername); + } + + + /** + * Build up the path string for keytab install location -no attempt to + * create the directory is made + * + * @return the path for keytab + */ + public Path buildKeytabInstallationDirPath(String keytabFolder) { + Preconditions.checkNotNull(keytabFolder); + Path path = getBaseApplicationPath(); + return new Path(path, YarnServiceConstants.KEYTAB_DIR + "/" + keytabFolder); + } + + /** + * Build up the path string for keytab install location -no attempt to + * create the directory is made + * + * @return the path for keytab installation location + */ + public Path buildKeytabPath(String keytabDir, String keytabName, String clusterName) { + Path homePath = getHomeDirectory(); + Path baseKeytabDir; + if (keytabDir != null) { + baseKeytabDir = new Path(homePath, keytabDir); + } else { + baseKeytabDir = new Path(buildClusterDirPath(clusterName), + YarnServiceConstants.KEYTAB_DIR); + } + return keytabName == null ? baseKeytabDir : + new Path(baseKeytabDir, keytabName); + } + + /** + * Build up the path string for resource install location -no attempt to + * create the directory is made + * + * @return the path for resource + */ + public Path buildResourcePath(String resourceFolder) { + Preconditions.checkNotNull(resourceFolder); + Path path = getBaseApplicationPath(); + return new Path(path, YarnServiceConstants.RESOURCE_DIR + "/" + resourceFolder); + } + + /** + * Build up the path string for resource install location -no attempt to + * create the directory is made + * + * @return the path for resource + */ + public Path buildResourcePath(String dirName, String fileName) { + Preconditions.checkNotNull(dirName); + Preconditions.checkNotNull(fileName); + Path path = getBaseApplicationPath(); + return new Path(path, YarnServiceConstants.RESOURCE_DIR + "/" + dirName + "/" + fileName); + } + + /** + * Create a directory with the given permissions. + * + * @param dir directory + * @param clusterPerms cluster permissions + * @throws IOException IO problem + * @throws BadClusterStateException any cluster state problem + */ + @SuppressWarnings("deprecation") + public void createWithPermissions(Path dir, FsPermission clusterPerms) throws + IOException, + BadClusterStateException { + if (fileSystem.isFile(dir)) { + // HADOOP-9361 shows some filesystems don't correctly fail here + throw new BadClusterStateException( + "Cannot create a directory over a file %s", dir); + } + log.debug("mkdir {} with perms {}", dir, clusterPerms); + //no mask whatoever + fileSystem.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000"); + fileSystem.mkdirs(dir, clusterPerms); + //and force set it anyway just to make sure + fileSystem.setPermission(dir, clusterPerms); + } + + /** + * Verify that the cluster directory is not present + * + * @param clustername name of the cluster + * @param clusterDirectory actual directory to look for + * @throws IOException trouble with FS + * @throws SliderException If the directory exists + */ + public void verifyClusterDirectoryNonexistent(String clustername, + Path clusterDirectory) + throws IOException, SliderException { + if (fileSystem.exists(clusterDirectory)) { + throw new SliderException(SliderExitCodes.EXIT_INSTANCE_EXISTS, + ErrorStrings.PRINTF_E_INSTANCE_ALREADY_EXISTS, clustername, + clusterDirectory); + } + } + /** + * Verify that the given directory is not present + * + * @param clusterDirectory actual directory to look for + * @throws IOException trouble with FS + * @throws SliderException If the directory exists + */ + public void verifyDirectoryNonexistent(Path clusterDirectory) throws + IOException, + SliderException { + if (fileSystem.exists(clusterDirectory)) { + + log.error("Dir {} exists: {}", + clusterDirectory, + listFSDir(clusterDirectory)); + throw new SliderException(SliderExitCodes.EXIT_INSTANCE_EXISTS, + ErrorStrings.PRINTF_E_INSTANCE_DIR_ALREADY_EXISTS, + clusterDirectory); + } + } + + /** + * Verify that a user has write access to a directory. + * It does this by creating then deleting a temp file + * + * @param dirPath actual directory to look for + * @throws FileNotFoundException file not found + * @throws IOException trouble with FS + * @throws BadClusterStateException if the directory is not writeable + */ + public void verifyDirectoryWriteAccess(Path dirPath) throws IOException, + SliderException { + verifyPathExists(dirPath); + Path tempFile = new Path(dirPath, "tmp-file-for-checks"); + try { + FSDataOutputStream out ; + out = fileSystem.create(tempFile, true); + IOUtils.closeStream(out); + fileSystem.delete(tempFile, false); + } catch (IOException e) { + log.warn("Failed to create file {}: {}", tempFile, e); + throw new BadClusterStateException(e, + "Unable to write to directory %s : %s", dirPath, e.toString()); + } + } + + /** + * Verify that a path exists + * @param path path to check + * @throws FileNotFoundException file not found + * @throws IOException trouble with FS + */ + public void verifyPathExists(Path path) throws IOException { + if (!fileSystem.exists(path)) { + throw new FileNotFoundException(path.toString()); + } + } + + /** + * Verify that a path exists + * @param path path to check + * @throws FileNotFoundException file not found or is not a file + * @throws IOException trouble with FS + */ + public void verifyFileExists(Path path) throws IOException { + FileStatus status = fileSystem.getFileStatus(path); + + if (!status.isFile()) { + throw new FileNotFoundException("Not a file: " + path.toString()); + } + } + + /** + * Given a path, check if it exists and is a file + * + * @param path + * absolute path to the file to check + * @return true if and only if path exists and is a file, false for all other + * reasons including if file check throws IOException + */ + public boolean isFile(Path path) { + boolean isFile = false; + try { + FileStatus status = fileSystem.getFileStatus(path); + if (status.isFile()) { + isFile = true; + } + } catch (IOException e) { + // ignore, isFile is already set to false + } + return isFile; + } + + /** + * Get the base path + * + * @return the base path optionally configured by + * {@link YarnServiceConf#YARN_SERVICE_BASE_PATH} + */ + public Path getBaseApplicationPath() { + String configuredBasePath = configuration + .get(YarnServiceConf.YARN_SERVICE_BASE_PATH, + getHomeDirectory() + "/" + YarnServiceConstants.SERVICE_BASE_DIRECTORY); + return new Path(configuredBasePath); + } + + /** + * Get slider dependency parent dir in HDFS + * + * @return the parent dir path of slider.tar.gz in HDFS + */ + public Path getDependencyPath() { + String parentDir = YarnServiceConstants.DEPENDENCY_DIR; + return new Path(String.format(parentDir, VersionInfo.getVersion())); + } + + /** + * Get slider.tar.gz absolute filepath in HDFS + * + * @return the absolute path to slider.tar.gz in HDFS + */ + public Path getDependencyTarGzip() { + Path dependencyLibAmPath = getDependencyPath(); + Path dependencyLibTarGzip = new Path( + dependencyLibAmPath.toUri().toString(), + YarnServiceConstants.DEPENDENCY_TAR_GZ_FILE_NAME + + YarnServiceConstants.DEPENDENCY_TAR_GZ_FILE_EXT); + return dependencyLibTarGzip; + } + + public Path getHomeDirectory() { + return fileSystem.getHomeDirectory(); + } + + /** + * Create an AM resource from the + * + * @param destPath dest path in filesystem + * @param resourceType resource type + * @return the local resource for AM + */ + public LocalResource createAmResource(Path destPath, LocalResourceType resourceType) throws IOException { + FileStatus destStatus = fileSystem.getFileStatus(destPath); + LocalResource amResource = Records.newRecord(LocalResource.class); + amResource.setType(resourceType); + // Set visibility of the resource + // Setting to most private option + amResource.setVisibility(LocalResourceVisibility.APPLICATION); + // Set the resource to be copied over + amResource.setResource( + URL.fromPath(fileSystem.resolvePath(destStatus.getPath()))); + // Set timestamp and length of file so that the framework + // can do basic sanity checks for the local resource + // after it has been copied over to ensure it is the same + // resource the client intended to use with the application + amResource.setTimestamp(destStatus.getModificationTime()); + amResource.setSize(destStatus.getLen()); + return amResource; + } + + /** + * Register all files under a fs path as a directory to push out + * + * @param srcDir src dir + * @param destRelativeDir dest dir (no trailing /) + * @return the map of entries + */ + public Map<String, LocalResource> submitDirectory(Path srcDir, String destRelativeDir) throws IOException { + //now register each of the files in the directory to be + //copied to the destination + FileStatus[] fileset = fileSystem.listStatus(srcDir); + Map<String, LocalResource> localResources = + new HashMap<String, LocalResource>(fileset.length); + for (FileStatus entry : fileset) { + + LocalResource resource = createAmResource(entry.getPath(), + LocalResourceType.FILE); + String relativePath = destRelativeDir + "/" + entry.getPath().getName(); + localResources.put(relativePath, resource); + } + return localResources; + } + + /** + * Submit a JAR containing a specific class, returning + * the resource to be mapped in + * + * @param clazz class to look for + * @param subdir subdirectory (expected to end in a "/") + * @param jarName <i>At the destination</i> + * @return the local resource ref + * @throws IOException trouble copying to HDFS + */ + public LocalResource submitJarWithClass(Class clazz, Path tempPath, String subdir, String jarName) + throws IOException, SliderException { + File localFile = SliderUtils.findContainingJarOrFail(clazz); + return submitFile(localFile, tempPath, subdir, jarName); + } + + /** + * Submit a local file to the filesystem references by the instance's cluster + * filesystem + * + * @param localFile filename + * @param subdir subdirectory (expected to end in a "/") + * @param destFileName destination filename + * @return the local resource ref + * @throws IOException trouble copying to HDFS + */ + public LocalResource submitFile(File localFile, Path tempPath, String subdir, String destFileName) + throws IOException { + Path src = new Path(localFile.toString()); + Path subdirPath = new Path(tempPath, subdir); + fileSystem.mkdirs(subdirPath); + Path destPath = new Path(subdirPath, destFileName); + log.debug("Copying {} (size={} bytes) to {}", localFile, localFile.length(), destPath); + + fileSystem.copyFromLocalFile(false, true, src, destPath); + + // Set the type of resource - file or archive + // archives are untarred at destination + // we don't need the jar file to be untarred for now + return createAmResource(destPath, LocalResourceType.FILE); + } + + /** + * Submit the AM tar.gz resource referenced by the instance's cluster + * filesystem. Also, update the providerResources object with the new + * resource. + * + * @param providerResources + * the provider resource map to be updated + * @throws IOException + * trouble copying to HDFS + */ + public void submitTarGzipAndUpdate( + Map<String, LocalResource> providerResources) throws IOException, + BadClusterStateException { + Path dependencyLibTarGzip = getDependencyTarGzip(); + LocalResource lc = createAmResource(dependencyLibTarGzip, + LocalResourceType.ARCHIVE); + providerResources.put(YarnServiceConstants.DEPENDENCY_LOCALIZED_DIR_LINK, lc); + } + + public void copyLocalFileToHdfs(File localPath, + Path destPath, FsPermission fp) + throws IOException { + if (localPath == null || destPath == null) { + throw new IOException("Either localPath or destPath is null"); + } + fileSystem.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, + "000"); + fileSystem.mkdirs(destPath.getParent(), fp); + log.info("Copying file {} to {}", localPath.toURI(), + fileSystem.getScheme() + ":/" + destPath.toUri()); + + fileSystem.copyFromLocalFile(false, true, new Path(localPath.getPath()), + destPath); + // set file permissions of the destPath + fileSystem.setPermission(destPath, fp); + } + + public void copyHdfsFileToLocal(Path hdfsPath, File destFile) + throws IOException { + if (hdfsPath == null || destFile == null) { + throw new IOException("Either hdfsPath or destPath is null"); + } + log.info("Copying file {} to {}", hdfsPath.toUri(), destFile.toURI()); + + Path destPath = new Path(destFile.getPath()); + fileSystem.copyToLocalFile(hdfsPath, destPath); + } + + /** + * list entries in a filesystem directory + * + * @param path directory + * @return a listing, one to a line + * @throws IOException + */ + public String listFSDir(Path path) throws IOException { + FileStatus[] stats = fileSystem.listStatus(path); + StringBuilder builder = new StringBuilder(); + for (FileStatus stat : stats) { + builder.append(stat.getPath().toString()) + .append("\t") + .append(stat.getLen()) + .append("\n"); + } + return builder.toString(); + } + + public String cat(Path path) throws IOException { + FileStatus status = fileSystem.getFileStatus(path); + byte[] b = new byte[(int) status.getLen()]; + FSDataInputStream in = null; + try { + in = fileSystem.open(path); + int count = in.read(b); + return new String(b, 0, count, UTF_8); + } finally { + IOUtils.closeStream(in); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/394183c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/Duration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/Duration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/Duration.java new file mode 100644 index 0000000..6fadfd3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/Duration.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.utils; + +import java.io.Closeable; + +/** + * A duration in milliseconds. This class can be used + * to count time, and to be polled to see if a time limit has + * passed. + */ +public class Duration implements Closeable { + public long start, finish; + public final long limit; + + /** + * Create a duration instance with a limit of 0 + */ + public Duration() { + this(0); + } + + /** + * Create a duration with a limit specified in millis + * @param limit duration in milliseconds + */ + public Duration(long limit) { + this.limit = limit; + } + + /** + * Start + * @return self + */ + public Duration start() { + start = now(); + return this; + } + + /** + * The close operation relays to {@link #finish()}. + * Implementing it allows Duration instances to be automatically + * finish()'d in Java7 try blocks for when used in measuring durations. + */ + @Override + public final void close() { + finish(); + } + + public void finish() { + finish = now(); + } + + protected long now() { + return System.nanoTime()/1000000; + } + + public long getInterval() { + return finish - start; + } + + /** + * return true if the limit has been exceeded + * @return true if a limit was set and the current time + * exceeds it. + */ + public boolean getLimitExceeded() { + return limit >= 0 && ((now() - start) > limit); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("Duration"); + if (finish >= start) { + builder.append(" finished at ").append(getInterval()).append(" millis;"); + } else { + if (start > 0) { + builder.append(" started but not yet finished;"); + } else { + builder.append(" unstarted;"); + } + } + if (limit > 0) { + builder.append(" limit: ").append(limit).append(" millis"); + if (getLimitExceeded()) { + builder.append(" - exceeded"); + } + } + return builder.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/394183c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/JsonSerDeser.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/JsonSerDeser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/JsonSerDeser.java new file mode 100644 index 0000000..7b22e3e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/hadoop/yarn/service/utils/JsonSerDeser.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.utils; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.DeserializationConfig; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.PropertyNamingStrategy; +import org.codehaus.jackson.map.SerializationConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * Support for marshalling objects to and from JSON. + * This class is NOT thread safe; it constructs an object mapper + * as an instance field. + * @param <T> + */ +public class JsonSerDeser<T> { + + private static final Logger log = LoggerFactory.getLogger(JsonSerDeser.class); + private static final String UTF_8 = "UTF-8"; + + private final Class<T> classType; + private final ObjectMapper mapper; + + /** + * Create an instance bound to a specific type + * @param classType class type + */ + public JsonSerDeser(Class<T> classType) { + this.classType = classType; + this.mapper = new ObjectMapper(); + mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + public JsonSerDeser(Class<T> classType, PropertyNamingStrategy namingStrategy) { + this(classType); + mapper.setPropertyNamingStrategy(namingStrategy); + } + + /** + * Convert from JSON + * @param json input + * @return the parsed JSON + * @throws IOException IO + * @throws JsonMappingException failure to map from the JSON to this class + */ + public T fromJson(String json) + throws IOException, JsonParseException, JsonMappingException { + try { + return mapper.readValue(json, classType); + } catch (IOException e) { + log.error("Exception while parsing json : " + e + "\n" + json, e); + throw e; + } + } + + /** + * Convert from a JSON file + * @param jsonFile input file + * @return the parsed JSON + * @throws IOException IO problems + * @throws JsonMappingException failure to map from the JSON to this class + */ + public T fromFile(File jsonFile) + throws IOException, JsonParseException, JsonMappingException { + File absoluteFile = jsonFile.getAbsoluteFile(); + try { + return mapper.readValue(absoluteFile, classType); + } catch (IOException e) { + log.error("Exception while parsing json file {}", absoluteFile, e); + throw e; + } + } + + /** + * Convert from a JSON file + * @param resource input file + * @return the parsed JSON + * @throws IOException IO problems + * @throws JsonMappingException failure to map from the JSON to this class + */ + public T fromResource(String resource) + throws IOException, JsonParseException, JsonMappingException { + try(InputStream resStream = this.getClass().getResourceAsStream(resource)) { + if (resStream == null) { + throw new FileNotFoundException(resource); + } + return (T) (mapper.readValue(resStream, classType)); + } catch (IOException e) { + log.error("Exception while parsing json resource {}", resource, e); + throw e; + } + } + + /** + * Convert from an input stream, closing the stream afterwards. + * @param stream + * @return the parsed JSON + * @throws IOException IO problems + */ + public T fromStream(InputStream stream) throws IOException { + try { + return (T) (mapper.readValue(stream, classType)); + } catch (IOException e) { + log.error("Exception while parsing json input stream", e); + throw e; + } finally { + IOUtils.closeStream(stream); + } + } + + /** + * clone by converting to JSON and back again. + * This is much less efficient than any Java clone process. + * @param instance instance to duplicate + * @return a new instance + * @throws IOException problems. + */ + public T fromInstance(T instance) throws IOException { + return fromJson(toJson(instance)); + } + + /** + * Deserialize from a byte array + * @param b + * @return the deserialized value + * @throws IOException parse problems + */ + public T fromBytes(byte[] b) throws IOException { + String json = new String(b, 0, b.length, UTF_8); + return fromJson(json); + } + + /** + * Load from a Hadoop filesystem + * @param fs filesystem + * @param path path + * @return a loaded CD + * @throws IOException IO problems + * @throws JsonParseException parse problems + * @throws JsonMappingException O/J mapping problems + */ + public T load(FileSystem fs, Path path) + throws IOException, JsonParseException, JsonMappingException { + FileStatus status = fs.getFileStatus(path); + long len = status.getLen(); + byte[] b = new byte[(int) len]; + FSDataInputStream dataInputStream = fs.open(path); + int count = dataInputStream.read(b); + if (count != len) { + throw new EOFException("Read of " + path +" finished prematurely"); + } + return fromBytes(b); + } + + + /** + * Save to a hadoop filesystem + * @param fs filesystem + * @param path path + * @param instance instance to save + * @param overwrite should any existing file be overwritten + * @throws IOException IO exception + */ + public void save(FileSystem fs, Path path, T instance, + boolean overwrite) throws + IOException { + FSDataOutputStream dataOutputStream = fs.create(path, overwrite); + writeJsonAsBytes(instance, dataOutputStream); + } + + /** + * Save an instance to a file + * @param instance instance to save + * @param file file + * @throws IOException + */ + public void save(T instance, File file) throws + IOException { + writeJsonAsBytes(instance, new FileOutputStream(file.getAbsoluteFile())); + } + + /** + * Write the json as bytes -then close the file + * @param dataOutputStream an outout stream that will always be closed + * @throws IOException on any failure + */ + private void writeJsonAsBytes(T instance, + OutputStream dataOutputStream) throws IOException { + try { + String json = toJson(instance); + byte[] b = json.getBytes(UTF_8); + dataOutputStream.write(b); + dataOutputStream.flush(); + dataOutputStream.close(); + } finally { + IOUtils.closeStream(dataOutputStream); + } + } + + /** + * Convert an object to a JSON string + * @param instance instance to convert + * @return a JSON string description + * @throws JsonParseException parse problems + * @throws JsonMappingException O/J mapping problems + */ + public String toJson(T instance) throws IOException, + JsonGenerationException, + JsonMappingException { + mapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, true); + return mapper.writeValueAsString(instance); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org