http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java new file mode 100644 index 0000000..9c0a983 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java @@ -0,0 +1,584 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.timelineservice; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeSet; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.util.TimelineServiceHelper; +import org.codehaus.jackson.annotate.JsonSetter; + +/** + * The basic timeline entity data structure for timeline service v2. Timeline + * entity objects are not thread safe and should not be accessed concurrently. + * All collection members will be initialized into empty collections. Two + * timeline entities are equal iff. their type and id are identical. + * + * All non-primitive type, non-collection members will be initialized into null. + * User should set the type and id of a timeline entity to make it valid (can be + * checked by using the {@link #isValid()} method). Callers to the getters + * should perform null checks for non-primitive type, non-collection members. + * + * Callers are recommended not to alter the returned collection objects from the + * getters. + */ +@XmlRootElement(name = "entity") +@XmlAccessorType(XmlAccessType.NONE) +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class TimelineEntity implements Comparable<TimelineEntity> { + protected final static String SYSTEM_INFO_KEY_PREFIX = "SYSTEM_INFO_"; + + /** + * Identifier of timeline entity(entity id + entity type). + */ + @XmlRootElement(name = "identifier") + @XmlAccessorType(XmlAccessType.NONE) + public static class Identifier { + private String type; + private String id; + + public Identifier(String type, String id) { + this.type = type; + this.id = id; + } + + public Identifier() { + + } + + @XmlElement(name = "type") + public String getType() { + return type; + } + + public void setType(String entityType) { + this.type = entityType; + } + + @XmlElement(name = "id") + public String getId() { + return id; + } + + public void setId(String entityId) { + this.id = entityId; + } + + @Override + public String toString() { + return "TimelineEntity[" + + "type='" + type + '\'' + + ", id='" + id + '\'' + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((id == null) ? 0 : id.hashCode()); + result = + prime * result + ((type == null) ? 0 : type.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof Identifier)) { + return false; + } + Identifier other = (Identifier) obj; + if (id == null) { + if (other.getId() != null) { + return false; + } + } else if (!id.equals(other.getId())) { + return false; + } + if (type == null) { + if (other.getType() != null) { + return false; + } + } else if (!type.equals(other.getType())) { + return false; + } + return true; + } + } + + private TimelineEntity real; + private Identifier identifier; + private HashMap<String, Object> info = new HashMap<>(); + private HashMap<String, String> configs = new HashMap<>(); + private Set<TimelineMetric> metrics = new HashSet<>(); + // events should be sorted by timestamp in descending order + private NavigableSet<TimelineEvent> events = new TreeSet<>(); + private HashMap<String, Set<String>> isRelatedToEntities = new HashMap<>(); + private HashMap<String, Set<String>> relatesToEntities = new HashMap<>(); + private Long createdTime; + + public TimelineEntity() { + identifier = new Identifier(); + } + + /** + * <p> + * The constuctor is used to construct a proxy {@link TimelineEntity} or its + * subclass object from the real entity object that carries information. + * </p> + * + * <p> + * It is usually used in the case where we want to recover class polymorphism + * after deserializing the entity from its JSON form. + * </p> + * @param entity the real entity that carries information + */ + public TimelineEntity(TimelineEntity entity) { + real = entity.getReal(); + } + + protected TimelineEntity(String type) { + this(); + identifier.type = type; + } + + @XmlElement(name = "type") + public String getType() { + if (real == null) { + return identifier.type; + } else { + return real.getType(); + } + } + + public void setType(String type) { + if (real == null) { + identifier.type = type; + } else { + real.setType(type); + } + } + + @XmlElement(name = "id") + public String getId() { + if (real == null) { + return identifier.id; + } else { + return real.getId(); + } + } + + public void setId(String id) { + if (real == null) { + identifier.id = id; + } else { + real.setId(id); + } + } + + public Identifier getIdentifier() { + if (real == null) { + return identifier; + } else { + return real.getIdentifier(); + } + } + + public void setIdentifier(Identifier entityIdentifier) { + if (real == null) { + this.identifier = entityIdentifier; + } else { + real.setIdentifier(entityIdentifier); + } + } + + // required by JAXB + @InterfaceAudience.Private + @XmlElement(name = "info") + public HashMap<String, Object> getInfoJAXB() { + if (real == null) { + return info; + } else { + return real.getInfoJAXB(); + } + } + + public Map<String, Object> getInfo() { + if (real == null) { + return info; + } else { + return real.getInfo(); + } + } + + public void setInfo(Map<String, Object> entityInfos) { + if (real == null) { + this.info = TimelineServiceHelper.mapCastToHashMap(entityInfos); + } else { + real.setInfo(entityInfos); + } + } + + public void addInfo(Map<String, Object> entityInfos) { + if (real == null) { + this.info.putAll(entityInfos); + } else { + real.addInfo(entityInfos); + } + } + + public void addInfo(String key, Object value) { + if (real == null) { + info.put(key, value); + } else { + real.addInfo(key, value); + } + } + + // required by JAXB + @InterfaceAudience.Private + @XmlElement(name = "configs") + public HashMap<String, String> getConfigsJAXB() { + if (real == null) { + return configs; + } else { + return real.getConfigsJAXB(); + } + } + + public Map<String, String> getConfigs() { + if (real == null) { + return configs; + } else { + return real.getConfigs(); + } + } + + public void setConfigs(Map<String, String> entityConfigs) { + if (real == null) { + this.configs = TimelineServiceHelper.mapCastToHashMap(entityConfigs); + } else { + real.setConfigs(entityConfigs); + } + } + + public void addConfigs(Map<String, String> entityConfigs) { + if (real == null) { + this.configs.putAll(entityConfigs); + } else { + real.addConfigs(entityConfigs); + } + } + + public void addConfig(String key, String value) { + if (real == null) { + configs.put(key, value); + } else { + real.addConfig(key, value); + } + } + + @XmlElement(name = "metrics") + public Set<TimelineMetric> getMetrics() { + if (real == null) { + return metrics; + } else { + return real.getMetrics(); + } + } + + public void setMetrics(Set<TimelineMetric> entityMetrics) { + if (real == null) { + this.metrics = entityMetrics; + } else { + real.setMetrics(entityMetrics); + } + } + + public void addMetrics(Set<TimelineMetric> entityMetrics) { + if (real == null) { + this.metrics.addAll(entityMetrics); + } else { + real.addMetrics(entityMetrics); + } + } + + public void addMetric(TimelineMetric metric) { + if (real == null) { + metrics.add(metric); + } else { + real.addMetric(metric); + } + } + + @XmlElement(name = "events") + public NavigableSet<TimelineEvent> getEvents() { + if (real == null) { + return events; + } else { + return real.getEvents(); + } + } + + public void setEvents(NavigableSet<TimelineEvent> entityEvents) { + if (real == null) { + this.events = entityEvents; + } else { + real.setEvents(entityEvents); + } + } + + public void addEvents(Set<TimelineEvent> entityEvents) { + if (real == null) { + this.events.addAll(entityEvents); + } else { + real.addEvents(entityEvents); + } + } + + public void addEvent(TimelineEvent event) { + if (real == null) { + events.add(event); + } else { + real.addEvent(event); + } + } + + public Map<String, Set<String>> getIsRelatedToEntities() { + if (real == null) { + return isRelatedToEntities; + } else { + return real.getIsRelatedToEntities(); + } + } + + // required by JAXB + @InterfaceAudience.Private + @XmlElement(name = "isrelatedto") + public HashMap<String, Set<String>> getIsRelatedToEntitiesJAXB() { + if (real == null) { + return isRelatedToEntities; + } else { + return real.getIsRelatedToEntitiesJAXB(); + } + } + + @JsonSetter("isrelatedto") + public void setIsRelatedToEntities( + Map<String, Set<String>> isRelatedTo) { + if (real == null) { + this.isRelatedToEntities = + TimelineServiceHelper.mapCastToHashMap(isRelatedTo); + } else { + real.setIsRelatedToEntities(isRelatedTo); + } + } + + public void addIsRelatedToEntities( + Map<String, Set<String>> isRelatedTo) { + if (real == null) { + for (Map.Entry<String, Set<String>> entry : isRelatedTo.entrySet()) { + Set<String> ids = this.isRelatedToEntities.get(entry.getKey()); + if (ids == null) { + ids = new HashSet<>(); + this.isRelatedToEntities.put(entry.getKey(), ids); + } + ids.addAll(entry.getValue()); + } + } else { + real.addIsRelatedToEntities(isRelatedTo); + } + } + + public void addIsRelatedToEntity(String type, String id) { + if (real == null) { + Set<String> ids = isRelatedToEntities.get(type); + if (ids == null) { + ids = new HashSet<>(); + isRelatedToEntities.put(type, ids); + } + ids.add(id); + } else { + real.addIsRelatedToEntity(type, id); + } + } + + // required by JAXB + @InterfaceAudience.Private + @XmlElement(name = "relatesto") + public HashMap<String, Set<String>> getRelatesToEntitiesJAXB() { + if (real == null) { + return relatesToEntities; + } else { + return real.getRelatesToEntitiesJAXB(); + } + } + + public Map<String, Set<String>> getRelatesToEntities() { + if (real == null) { + return relatesToEntities; + } else { + return real.getRelatesToEntities(); + } + } + + public void addRelatesToEntities(Map<String, Set<String>> relatesTo) { + if (real == null) { + for (Map.Entry<String, Set<String>> entry : relatesTo.entrySet()) { + Set<String> ids = this.relatesToEntities.get(entry.getKey()); + if (ids == null) { + ids = new HashSet<>(); + this.relatesToEntities.put(entry.getKey(), ids); + } + ids.addAll(entry.getValue()); + } + } else { + real.addRelatesToEntities(relatesTo); + } + } + + public void addRelatesToEntity(String type, String id) { + if (real == null) { + Set<String> ids = relatesToEntities.get(type); + if (ids == null) { + ids = new HashSet<>(); + relatesToEntities.put(type, ids); + } + ids.add(id); + } else { + real.addRelatesToEntity(type, id); + } + } + + @JsonSetter("relatesto") + public void setRelatesToEntities(Map<String, Set<String>> relatesTo) { + if (real == null) { + this.relatesToEntities = + TimelineServiceHelper.mapCastToHashMap(relatesTo); + } else { + real.setRelatesToEntities(relatesTo); + } + } + + @XmlElement(name = "createdtime") + public Long getCreatedTime() { + if (real == null) { + return createdTime; + } else { + return real.getCreatedTime(); + } + } + + @JsonSetter("createdtime") + public void setCreatedTime(Long createdTs) { + if (real == null) { + this.createdTime = createdTs; + } else { + real.setCreatedTime(createdTs); + } + } + + /** + * Set UID in info which will be then used for query by UI. + * @param uidKey key for UID in info. + * @param uId UID to be set for the key. + */ + public void setUID(String uidKey, String uId) { + if (real == null) { + info.put(uidKey, uId); + } else { + real.addInfo(uidKey, uId); + } + } + + public boolean isValid() { + return (getId() != null && getType() != null); + } + + // When get hashCode for a timeline entity, or check if two timeline entities + // are equal, we only compare their identifiers (id and type) + @Override + public int hashCode() { + return getIdentifier().hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof TimelineEntity)) { + return false; + } + TimelineEntity other = (TimelineEntity) obj; + return getIdentifier().equals(other.getIdentifier()); + } + + @Override + public int compareTo(TimelineEntity other) { + int comparison = getType().compareTo(other.getType()); + if (comparison == 0) { + if (getCreatedTime() == null) { + if (other.getCreatedTime() == null) { + return getId().compareTo(other.getId()); + } else { + return 1; + } + } + if (other.getCreatedTime() == null) { + return -1; + } + if (getCreatedTime() > other.getCreatedTime()) { + // Order by created time desc + return -1; + } else if (getCreatedTime() < other.getCreatedTime()) { + return 1; + } else { + return getId().compareTo(other.getId()); + } + } else { + return comparison; + } + } + + protected TimelineEntity getReal() { + return real == null ? this : real; + } + + public String toString() { + if (real == null) { + return identifier.toString(); + } else { + return real.toString(); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java new file mode 100644 index 0000000..8fcc2ae --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.timelineservice; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Defines type of entity. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public enum TimelineEntityType { + YARN_CLUSTER, + YARN_FLOW_RUN, + YARN_APPLICATION, + YARN_APPLICATION_ATTEMPT, + YARN_CONTAINER, + YARN_USER, + YARN_QUEUE, + YARN_FLOW_ACTIVITY; + + /** + * Whether the input type can be a parent of this entity. + * + * @param type entity type. + * @return true, if this entity type is parent of passed entity type, false + * otherwise. + */ + public boolean isParent(TimelineEntityType type) { + switch (this) { + case YARN_CLUSTER: + return false; + case YARN_FLOW_RUN: + return YARN_FLOW_RUN == type || YARN_CLUSTER == type; + case YARN_APPLICATION: + return YARN_FLOW_RUN == type || YARN_CLUSTER == type; + case YARN_APPLICATION_ATTEMPT: + return YARN_APPLICATION == type; + case YARN_CONTAINER: + return YARN_APPLICATION_ATTEMPT == type; + case YARN_QUEUE: + return YARN_QUEUE == type; + default: + return false; + } + } + + /** + * Whether the input type can be a child of this entity. + * + * @param type entity type. + * @return true, if this entity type is child of passed entity type, false + * otherwise. + */ + public boolean isChild(TimelineEntityType type) { + switch (this) { + case YARN_CLUSTER: + return YARN_FLOW_RUN == type || YARN_APPLICATION == type; + case YARN_FLOW_RUN: + return YARN_FLOW_RUN == type || YARN_APPLICATION == type; + case YARN_APPLICATION: + return YARN_APPLICATION_ATTEMPT == type; + case YARN_APPLICATION_ATTEMPT: + return YARN_CONTAINER == type; + case YARN_CONTAINER: + return false; + case YARN_QUEUE: + return YARN_QUEUE == type; + default: + return false; + } + } + + /** + * Whether the type of this entity matches the type indicated by the input + * argument. + * + * @param typeString entity type represented as a string. + * @return true, if string representation of this entity type matches the + * entity type passed. + */ + public boolean matches(String typeString) { + return toString().equals(typeString); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java new file mode 100644 index 0000000..87fc291 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.timelineservice; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.util.TimelineServiceHelper; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.HashMap; +import java.util.Map; + +/** + * This class contains the information of an event that belongs to an entity. + * Users are free to define what the event means, such as starting an + * application, container being allocated, etc. + */ +@XmlRootElement(name = "event") +@XmlAccessorType(XmlAccessType.NONE) +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class TimelineEvent implements Comparable<TimelineEvent> { + public static final long INVALID_TIMESTAMP = 0L; + + private String id; + private HashMap<String, Object> info = new HashMap<>(); + private long timestamp; + + public TimelineEvent() { + + } + + @XmlElement(name = "id") + public String getId() { + return id; + } + + public void setId(String eventId) { + this.id = eventId; + } + + // required by JAXB + @InterfaceAudience.Private + @XmlElement(name = "info") + public HashMap<String, Object> getInfoJAXB() { + return info; + } + + public Map<String, Object> getInfo() { + return info; + } + + public void setInfo(Map<String, Object> infos) { + this.info = TimelineServiceHelper.mapCastToHashMap(infos); + } + + public void addInfo(Map<String, Object> infos) { + this.info.putAll(infos); + } + + public void addInfo(String key, Object value) { + info.put(key, value); + } + + @XmlElement(name = "timestamp") + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long ts) { + this.timestamp = ts; + } + + public boolean isValid() { + return (id != null && timestamp != INVALID_TIMESTAMP); + } + + @Override + public int hashCode() { + int result = (int) (timestamp ^ (timestamp >>> 32)); + result = 31 * result + id.hashCode(); + return result; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TimelineEvent)) { + return false; + } + + TimelineEvent event = (TimelineEvent) o; + + if (timestamp != event.timestamp) { + return false; + } + if (!id.equals(event.id)) { + return false; + } + return true; + } + + @Override + public int compareTo(TimelineEvent other) { + if (timestamp > other.timestamp) { + return -1; + } else if (timestamp < other.timestamp) { + return 1; + } else { + return id.compareTo(other.id); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java new file mode 100644 index 0000000..5c908d6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.timelineservice; + +import java.util.Collections; +import java.util.Map; +import java.util.TreeMap; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +/** + * This class contains the information of a metric that is related to some + * entity. Metric can either be a time series or single value. + */ +@XmlRootElement(name = "metric") +@XmlAccessorType(XmlAccessType.NONE) +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class TimelineMetric { + + /** + * Type of metric. + */ + public static enum Type { + SINGLE_VALUE, + TIME_SERIES + } + + private Type type; + private String id; + // By default, not to do any aggregation operations. This field will NOT be + // persisted (like a "transient" member). + private TimelineMetricOperation realtimeAggregationOp + = TimelineMetricOperation.NOP; + + private TreeMap<Long, Number> values + = new TreeMap<>(Collections.reverseOrder()); + + public TimelineMetric() { + this(Type.SINGLE_VALUE); + } + + public TimelineMetric(Type type) { + this.type = type; + } + + + @XmlElement(name = "type") + public Type getType() { + return type; + } + + public void setType(Type metricType) { + this.type = metricType; + } + + @XmlElement(name = "id") + public String getId() { + return id; + } + + public void setId(String metricId) { + this.id = metricId; + } + + /** + * Get the real time aggregation operation of this metric. + * + * @return Real time aggregation operation + */ + // required by JAXB + @XmlElement(name = "aggregationOp") + public TimelineMetricOperation getRealtimeAggregationOp() { + return realtimeAggregationOp; + } + + /** + * Set the real time aggregation operation of this metric. + * + * @param op A timeline metric operation that the metric should perform on + * real time aggregations + */ + public void setRealtimeAggregationOp( + final TimelineMetricOperation op) { + this.realtimeAggregationOp = op; + } + + // required by JAXB + @InterfaceAudience.Private + @XmlElement(name = "values") + public TreeMap<Long, Number> getValuesJAXB() { + return values; + } + + public Map<Long, Number> getValues() { + return values; + } + + public void setValues(Map<Long, Number> vals) { + if (type == Type.SINGLE_VALUE) { + overwrite(vals); + } else { + if (vals != null) { + this.values = new TreeMap<>(Collections.reverseOrder()); + this.values.putAll(vals); + } else { + this.values = null; + } + } + } + + public void addValues(Map<Long, Number> vals) { + if (type == Type.SINGLE_VALUE) { + overwrite(vals); + } else { + this.values.putAll(vals); + } + } + + public void addValue(long timestamp, Number value) { + if (type == Type.SINGLE_VALUE) { + values.clear(); + } + values.put(timestamp, value); + } + + private void overwrite(Map<Long, Number> vals) { + if (vals.size() > 1) { + throw new IllegalArgumentException( + "Values cannot contain more than one point in " + + Type.SINGLE_VALUE + " mode"); + } + this.values.clear(); + this.values.putAll(vals); + } + + public boolean isValid() { + return (id != null); + } + + @Override + public int hashCode() { + int result = id.hashCode(); + result = 31 * result + type.hashCode(); + return result; + } + + // Only check if type and id are equal + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TimelineMetric)) { + return false; + } + + TimelineMetric m = (TimelineMetric) o; + + if (!id.equals(m.id)) { + return false; + } + if (type != m.type) { + return false; + } + return true; + } + + @Override + public String toString() { + return "{id: " + id + ", type: " + type + + ", realtimeAggregationOp: " + + realtimeAggregationOp + "; " + values.toString() + + "}"; + } + + /** + * Get the latest timeline metric as single value type. + * + * @param metric Incoming timeline metric + * @return The latest metric in the incoming metric + */ + public static TimelineMetric getLatestSingleValueMetric( + TimelineMetric metric) { + if (metric.getType() == Type.SINGLE_VALUE) { + return metric; + } else { + TimelineMetric singleValueMetric = new TimelineMetric(Type.SINGLE_VALUE); + Long firstKey = metric.values.firstKey(); + if (firstKey != null) { + Number firstValue = metric.values.get(firstKey); + singleValueMetric.addValue(firstKey, firstValue); + } + return singleValueMetric; + } + } + + /** + * Get single data timestamp of the metric. + * + * @return the single data timestamp + */ + public long getSingleDataTimestamp() { + if (this.type == Type.SINGLE_VALUE) { + if (values.size() == 0) { + throw new YarnRuntimeException("Values for this timeline metric is " + + "empty."); + } else { + return values.firstKey(); + } + } else { + throw new YarnRuntimeException("Type for this timeline metric is not " + + "SINGLE_VALUE."); + } + } + + /** + * Get single data value of the metric. + * + * @return the single data value + */ + public Number getSingleDataValue() { + if (this.type == Type.SINGLE_VALUE) { + if (values.size() == 0) { + return null; + } else { + return values.get(values.firstKey()); + } + } else { + throw new YarnRuntimeException("Type for this timeline metric is not " + + "SINGLE_VALUE."); + } + } + + /** + * Aggregate an incoming metric to the base aggregated metric with the given + * operation state in a stateless fashion. The assumption here is + * baseAggregatedMetric and latestMetric should be single value data if not + * null. + * + * @param incomingMetric Incoming timeline metric to aggregate + * @param baseAggregatedMetric Base timeline metric + * @return Result metric after aggregation + */ + public static TimelineMetric aggregateTo(TimelineMetric incomingMetric, + TimelineMetric baseAggregatedMetric) { + return aggregateTo(incomingMetric, baseAggregatedMetric, null); + } + + /** + * Aggregate an incoming metric to the base aggregated metric with the given + * operation state. The assumption here is baseAggregatedMetric and + * latestMetric should be single value data if not null. + * + * @param incomingMetric Incoming timeline metric to aggregate + * @param baseAggregatedMetric Base timeline metric + * @param state Operation state + * @return Result metric after aggregation + */ + public static TimelineMetric aggregateTo(TimelineMetric incomingMetric, + TimelineMetric baseAggregatedMetric, Map<Object, Object> state) { + TimelineMetricOperation operation + = incomingMetric.getRealtimeAggregationOp(); + return operation.aggregate(incomingMetric, baseAggregatedMetric, state); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricCalculator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricCalculator.java new file mode 100644 index 0000000..4c9045f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricCalculator.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.timelineservice; + +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +/** + * A calculator for timeline metrics. + */ +public final class TimelineMetricCalculator { + + private TimelineMetricCalculator() { + // do nothing. + } + + /** + * Compare two not-null numbers. + * @param n1 Number n1 + * @param n2 Number n2 + * @return 0 if n1 equals n2, a negative int if n1 is less than n2, a + * positive int otherwise. + */ + public static int compare(Number n1, Number n2) { + if (n1 == null || n2 == null) { + throw new YarnRuntimeException( + "Number to be compared shouldn't be null."); + } + + if (n1 instanceof Integer || n1 instanceof Long) { + if (n1.longValue() == n2.longValue()) { + return 0; + } else { + return (n1.longValue() < n2.longValue()) ? -1 : 1; + } + } + + if (n1 instanceof Float || n1 instanceof Double) { + if (n1.doubleValue() == n2.doubleValue()) { + return 0; + } else { + return (n1.doubleValue() < n2.doubleValue()) ? -1 : 1; + } + } + + // TODO throw warnings/exceptions for other types of number. + throw new YarnRuntimeException("Unsupported types for number comparison: " + + n1.getClass().getName() + ", " + n2.getClass().getName()); + } + + /** + * Subtract operation between two Numbers. + * @param n1 Number n1 + * @param n2 Number n2 + * @return Number represent to (n1 - n2). + */ + public static Number sub(Number n1, Number n2) { + if (n1 == null) { + throw new YarnRuntimeException( + "Number to be subtracted shouldn't be null."); + } else if (n2 == null) { + return n1; + } + + if (n1 instanceof Integer || n1 instanceof Long) { + return n1.longValue() - n2.longValue(); + } + + if (n1 instanceof Float || n1 instanceof Double) { + return n1.doubleValue() - n2.doubleValue(); + } + + // TODO throw warnings/exceptions for other types of number. + return null; + } + + /** + * Sum up two Numbers. + * @param n1 Number n1 + * @param n2 Number n2 + * @return Number represent to (n1 + n2). + */ + public static Number sum(Number n1, Number n2) { + if (n1 == null) { + return n2; + } else if (n2 == null) { + return n1; + } + + if (n1 instanceof Integer || n1 instanceof Long) { + return n1.longValue() + n2.longValue(); + } + + if (n1 instanceof Float || n1 instanceof Double) { + return n1.doubleValue() + n2.doubleValue(); + } + + // TODO throw warnings/exceptions for other types of number. + return null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricOperation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricOperation.java new file mode 100644 index 0000000..58e5c38 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricOperation.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.timelineservice; + +import java.util.Map; + +/** + * Aggregation operations. + */ +public enum TimelineMetricOperation { + NOP("NOP") { + /** + * Do nothing on the base metric. + * + * @param incoming Metric a + * @param base Metric b + * @param state Operation state (not used) + * @return Metric b + */ + @Override + public TimelineMetric exec(TimelineMetric incoming, + TimelineMetric base, Map<Object, Object> state) { + return base; + } + }, + MAX("MAX") { + /** + * Keep the greater value of incoming and base. Stateless operation. + * + * @param incoming Metric a + * @param base Metric b + * @param state Operation state (not used) + * @return the greater value of a and b + */ + @Override + public TimelineMetric exec(TimelineMetric incoming, + TimelineMetric base, Map<Object, Object> state) { + if (base == null) { + return incoming; + } + Number incomingValue = incoming.getSingleDataValue(); + Number aggregateValue = base.getSingleDataValue(); + if (aggregateValue == null) { + aggregateValue = Long.MIN_VALUE; + } + if (TimelineMetricCalculator.compare(incomingValue, aggregateValue) > 0) { + base.addValue(incoming.getSingleDataTimestamp(), incomingValue); + } + return base; + } + }, + REPLACE("REPLACE") { + /** + * Replace the base metric with the incoming value. Stateless operation. + * + * @param incoming Metric a + * @param base Metric b + * @param state Operation state (not used) + * @return Metric a + */ + @Override + public TimelineMetric exec(TimelineMetric incoming, + TimelineMetric base, + Map<Object, Object> state) { + return incoming; + } + }, + SUM("SUM") { + /** + * Return the sum of the incoming metric and the base metric if the + * operation is stateless. For stateful operations, also subtract the + * value of the timeline metric mapped to the PREV_METRIC_STATE_KEY + * in the state object. + * + * @param incoming Metric a + * @param base Metric b + * @param state Operation state (PREV_METRIC_STATE_KEY's value as Metric p) + * @return A metric with value a + b - p + */ + @Override + public TimelineMetric exec(TimelineMetric incoming, TimelineMetric base, + Map<Object, Object> state) { + if (base == null) { + return incoming; + } + Number incomingValue = incoming.getSingleDataValue(); + Number aggregateValue = base.getSingleDataValue(); + Number result + = TimelineMetricCalculator.sum(incomingValue, aggregateValue); + + // If there are previous value in the state, we will take it off from the + // sum + if (state != null) { + Object prevMetric = state.get(PREV_METRIC_STATE_KEY); + if (prevMetric instanceof TimelineMetric) { + result = TimelineMetricCalculator.sub(result, + ((TimelineMetric) prevMetric).getSingleDataValue()); + } + } + base.addValue(incoming.getSingleDataTimestamp(), result); + return base; + } + }, + AVG("AVERAGE") { + /** + * Return the average value of the incoming metric and the base metric, + * with a given state. Not supported yet. + * + * @param incoming Metric a + * @param base Metric b + * @param state Operation state + * @return Not finished yet + */ + @Override + public TimelineMetric exec(TimelineMetric incoming, TimelineMetric base, + Map<Object, Object> state) { + // Not supported yet + throw new UnsupportedOperationException( + "Unsupported aggregation operation: AVERAGE"); + } + }; + + public static final String PREV_METRIC_STATE_KEY = "PREV_METRIC"; + + /** + * Perform the aggregation operation. + * + * @param incoming Incoming metric + * @param aggregate Base aggregation metric + * @param state Operation state + * @return Result metric for this aggregation operation + */ + public TimelineMetric aggregate(TimelineMetric incoming, + TimelineMetric aggregate, Map<Object, Object> state) { + return exec(incoming, aggregate, state); + } + + private final String opName; + + TimelineMetricOperation(String opString) { + opName = opString; + } + + @Override + public String toString() { + return this.opName; + } + + abstract TimelineMetric exec(TimelineMetric incoming, TimelineMetric base, + Map<Object, Object> state); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java new file mode 100644 index 0000000..eda1ee2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.timelineservice; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.List; + +/** + * A class that holds a list of put errors. This is the response returned when a + * list of {@link TimelineEntity} objects is added to the timeline. If there are + * errors in storing individual entity objects, they will be indicated in the + * list of errors. + */ +@XmlRootElement(name = "response") +@XmlAccessorType(XmlAccessType.NONE) +@Public +@Unstable +public class TimelineWriteResponse { + + private List<TimelineWriteError> errors = new ArrayList<TimelineWriteError>(); + + public TimelineWriteResponse() { + + } + + /** + * Get a list of {@link TimelineWriteError} instances. + * + * @return a list of {@link TimelineWriteError} instances + */ + @XmlElement(name = "errors") + public List<TimelineWriteError> getErrors() { + return errors; + } + + /** + * Add a single {@link TimelineWriteError} instance into the existing list. + * + * @param error + * a single {@link TimelineWriteError} instance + */ + public void addError(TimelineWriteError error) { + errors.add(error); + } + + /** + * Add a list of {@link TimelineWriteError} instances into the existing list. + * + * @param writeErrors + * a list of {@link TimelineWriteError} instances + */ + public void addErrors(List<TimelineWriteError> writeErrors) { + this.errors.addAll(writeErrors); + } + + /** + * Set the list to the given list of {@link TimelineWriteError} instances. + * + * @param writeErrors + * a list of {@link TimelineWriteError} instances + */ + public void setErrors(List<TimelineWriteError> writeErrors) { + this.errors.clear(); + this.errors.addAll(writeErrors); + } + + /** + * A class that holds the error code for one entity. + */ + @XmlRootElement(name = "error") + @XmlAccessorType(XmlAccessType.NONE) + @Public + @Unstable + public static class TimelineWriteError { + + /** + * Error code returned if an IOException is encountered when storing an + * entity. + */ + public static final int IO_EXCEPTION = 1; + + private String entityId; + private String entityType; + private int errorCode; + + /** + * Get the entity Id. + * + * @return the entity Id + */ + @XmlElement(name = "entity") + public String getEntityId() { + return entityId; + } + + /** + * Set the entity Id. + * + * @param id the entity Id. + */ + public void setEntityId(String id) { + this.entityId = id; + } + + /** + * Get the entity type. + * + * @return the entity type + */ + @XmlElement(name = "entitytype") + public String getEntityType() { + return entityType; + } + + /** + * Set the entity type. + * + * @param type the entity type. + */ + public void setEntityType(String type) { + this.entityType = type; + } + + /** + * Get the error code. + * + * @return an error code + */ + @XmlElement(name = "errorcode") + public int getErrorCode() { + return errorCode; + } + + /** + * Set the error code to the given error code. + * + * @param code an error code. + */ + public void setErrorCode(int code) { + this.errorCode = code; + } + + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/UserEntity.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/UserEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/UserEntity.java new file mode 100644 index 0000000..ced57c6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/UserEntity.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.timelineservice; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * This entity represents a user. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class UserEntity extends TimelineEntity { + public UserEntity() { + super(TimelineEntityType.YARN_USER.toString()); + } + + public UserEntity(TimelineEntity entity) { + super(entity); + if (!entity.getType().equals(TimelineEntityType.YARN_USER.toString())) { + throw new IllegalArgumentException("Incompatible entity type: " + + getId()); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/package-info.java new file mode 100644 index 0000000..43805c8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/package-info.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.api.records.timelineservice contains classes + * which define the data model for ATSv2. + */ +@InterfaceAudience.Public +package org.apache.hadoop.yarn.api.records.timelineservice; + +import org.apache.hadoop.classification.InterfaceAudience; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 26033d3..2dbe48c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -83,6 +83,10 @@ public class YarnConfiguration extends Configuration { new DeprecationDelta("yarn.client.max-nodemanagers-proxies", NM_CLIENT_MAX_NM_PROXIES) }); + Configuration.addDeprecations(new DeprecationDelta[] { + new DeprecationDelta(RM_SYSTEM_METRICS_PUBLISHER_ENABLED, + SYSTEM_METRICS_PUBLISHER_ENABLED) + }); } //Configurations @@ -133,6 +137,7 @@ public class YarnConfiguration extends Configuration { public static final String RM_PREFIX = "yarn.resourcemanager."; public static final String RM_CLUSTER_ID = RM_PREFIX + "cluster-id"; + public static final String DEFAULT_RM_CLUSTER_ID = "yarn_cluster"; public static final String RM_HOSTNAME = RM_PREFIX + "hostname"; @@ -485,16 +490,37 @@ public class YarnConfiguration extends Configuration { /** * The setting that controls whether yarn system metrics is published on the - * timeline server or not by RM. + * timeline server or not by RM. This configuration setting is for ATS V1. + * This is now deprecated in favor of SYSTEM_METRICS_PUBLISHER_ENABLED. + */ + public static final String RM_SYSTEM_METRICS_PUBLISHER_ENABLED = RM_PREFIX + + "system-metrics-publisher.enabled"; + public static final boolean DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED = + false; + + /** + * The setting that controls whether yarn system metrics is published on the + * timeline server or not by RM and NM. This configuration setting is for + * ATS v2. */ - public static final String RM_SYSTEM_METRICS_PUBLISHER_ENABLED = - RM_PREFIX + "system-metrics-publisher.enabled"; - public static final boolean DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED = false; + public static final String SYSTEM_METRICS_PUBLISHER_ENABLED = YARN_PREFIX + + "system-metrics-publisher.enabled"; + public static final boolean DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED = false; + + /** + * The setting that controls whether yarn container events are published to + * the timeline service or not by RM. This configuration setting is for ATS + * V2 + */ + public static final String RM_PUBLISH_CONTAINER_EVENTS_ENABLED = YARN_PREFIX + + "rm.system-metrics-publisher.emit-container-events"; + public static final boolean DEFAULT_RM_PUBLISH_CONTAINER_EVENTS_ENABLED = + false; public static final String RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = RM_PREFIX + "system-metrics-publisher.dispatcher.pool-size"; - public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = - 10; + public static final int + DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = 10; //RM delegation token related keys public static final String RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY = @@ -813,6 +839,11 @@ public class YarnConfiguration extends Configuration { NM_PREFIX + "container-manager.thread-count"; public static final int DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT = 20; + /** Number of threads container manager uses.*/ + public static final String NM_COLLECTOR_SERVICE_THREAD_COUNT = + NM_PREFIX + "collector-service.thread-count"; + public static final int DEFAULT_NM_COLLECTOR_SERVICE_THREAD_COUNT = 5; + /** Number of threads used in cleanup.*/ public static final String NM_DELETE_THREAD_COUNT = NM_PREFIX + "delete.thread-count"; @@ -840,6 +871,13 @@ public class YarnConfiguration extends Configuration { public static final String DEFAULT_NM_LOCALIZER_ADDRESS = "0.0.0.0:" + DEFAULT_NM_LOCALIZER_PORT; + /** Address where the collector service IPC is.*/ + public static final String NM_COLLECTOR_SERVICE_ADDRESS = + NM_PREFIX + "collector-service.address"; + public static final int DEFAULT_NM_COLLECTOR_SERVICE_PORT = 8048; + public static final String DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS = + "0.0.0.0:" + DEFAULT_NM_LOCALIZER_PORT; + /** Interval in between cache cleanups.*/ public static final String NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS = NM_PREFIX + "localizer.cache.cleanup.interval-ms"; @@ -1791,7 +1829,7 @@ public class YarnConfiguration extends Configuration { public static final String TIMELINE_SERVICE_UI_WEB_PATH_PREFIX = TIMELINE_SERVICE_PREFIX + "ui-web-path."; - /** Timeline client settings */ + /** Timeline client settings. */ public static final String TIMELINE_SERVICE_CLIENT_PREFIX = TIMELINE_SERVICE_PREFIX + "client."; @@ -1927,6 +1965,53 @@ public class YarnConfiguration extends Configuration { = TIMELINE_SERVICE_PREFIX + "entity-file.fs-support-append"; + /** + * Settings for timeline service v2.0 + */ + public static final String TIMELINE_SERVICE_WRITER_CLASS = + TIMELINE_SERVICE_PREFIX + "writer.class"; + + public static final String TIMELINE_SERVICE_READER_CLASS = + TIMELINE_SERVICE_PREFIX + "reader.class"; + + /** The setting that controls how often the timeline collector flushes the + * timeline writer. + */ + public static final String TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = + TIMELINE_SERVICE_PREFIX + "writer.flush-interval-seconds"; + + public static final int + DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = 60; + + /** + * The name for setting that controls how long the final value of + * a metric of a completed app is retained before merging + * into the flow sum. + */ + public static final String APP_FINAL_VALUE_RETENTION_THRESHOLD = + TIMELINE_SERVICE_PREFIX + + "hbase.coprocessor.app-final-value-retention-milliseconds"; + + /** + * The setting that controls how long the final value of a metric of a + * completed app is retained before merging into the flow sum. Up to this time + * after an application is completed out-of-order values that arrive can be + * recognized and discarded at the cost of increased storage. + */ + public static final long DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD = 3 * 24 + * 60 * 60 * 1000L; + + public static final String ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS = + TIMELINE_SERVICE_PREFIX + "app-collector.linger-period.ms"; + + public static final int DEFAULT_ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS = 1000; + + public static final String NUMBER_OF_ASYNC_ENTITIES_TO_MERGE = + TIMELINE_SERVICE_PREFIX + + "timeline-client.number-of-async-entities-to-merge"; + + public static final int DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE = 10; + // mark app-history related configs @Private as application history is going // to be integrated into the timeline service @Private @@ -1985,6 +2070,7 @@ public class YarnConfiguration extends Configuration { /** The listening endpoint for the timeline service application.*/ public static final String TIMELINE_SERVICE_BIND_HOST = TIMELINE_SERVICE_PREFIX + "bind-host"; + public static final String DEFAULT_TIMELINE_SERVICE_BIND_HOST = "0.0.0.0"; /** The number of threads to handle client RPC API requests. */ public static final String TIMELINE_SERVICE_HANDLER_THREAD_COUNT = @@ -2180,6 +2266,16 @@ public class YarnConfiguration extends Configuration { public static final long DEFAULT_TIMELINE_DELEGATION_TOKEN_MAX_LIFETIME = 7*24*60*60*1000; // 7 days + // Timeline service v2 offlien aggregation related keys + public static final String TIMELINE_OFFLINE_AGGREGATION_PREFIX = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + "aggregation.offline."; + public static final String PHOENIX_OFFLINE_STORAGE_CONN_STR + = TIMELINE_OFFLINE_AGGREGATION_PREFIX + + "phoenix.connectionString"; + + public static final String PHOENIX_OFFLINE_STORAGE_CONN_STR_DEFAULT + = "jdbc:phoenix:localhost:2181:/hbase"; + // /////////////////////////////// // Shared Cache Configs // /////////////////////////////// @@ -2722,6 +2818,53 @@ public class YarnConfiguration extends Configuration { return clusterId; } + // helper methods for timeline service configuration + /** + * Returns whether the timeline service is enabled via configuration. + * + * @param conf the configuration + * @return whether the timeline service is enabled. + */ + public static boolean timelineServiceEnabled(Configuration conf) { + return conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED); + } + + /** + * Returns the timeline service version. It does not check whether the + * timeline service itself is enabled. + * + * @param conf the configuration + * @return the timeline service version as a float. + */ + public static float getTimelineServiceVersion(Configuration conf) { + return conf.getFloat(TIMELINE_SERVICE_VERSION, + DEFAULT_TIMELINE_SERVICE_VERSION); + } + + /** + * Returns whether the timeline service v.2 is enabled via configuration. + * + * @param conf the configuration + * @return whether the timeline service v.2 is enabled. V.2 refers to a + * version greater than equal to 2 but smaller than 3. + */ + public static boolean timelineServiceV2Enabled(Configuration conf) { + return timelineServiceEnabled(conf) && + (int)getTimelineServiceVersion(conf) == 2; + } + + /** + * Returns whether the system publisher is enabled. + * + * @param conf the configuration + * @return whether the system publisher is enabled. + */ + public static boolean systemMetricsPublisherEnabled(Configuration conf) { + return conf.getBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, + YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED); + } + /* For debugging. mp configurations to system output as XML format. */ public static void main(String[] args) throws Exception { new YarnConfiguration(new Configuration()).writeXml(System.out); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/TimelineServiceHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/TimelineServiceHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/TimelineServiceHelper.java new file mode 100644 index 0000000..e0268a6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/TimelineServiceHelper.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.util; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; + +/** + * Helper class for Timeline service. + */ +@LimitedPrivate({ "MapReduce", "YARN" }) +public final class TimelineServiceHelper { + + private TimelineServiceHelper() { + // Utility classes should not have a public or default constructor. + } + + /** + * Cast map to HashMap for generic type. + * @param originalMap the map need to be casted + * @param <E> key type + * @param <V> value type + * @return casted HashMap object + */ + public static <E, V> HashMap<E, V> mapCastToHashMap( + Map<E, V> originalMap) { + return originalMap == null ? null : originalMap instanceof HashMap ? + (HashMap<E, V>) originalMap : new HashMap<E, V>(originalMap); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 7070e38..dddb0f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -89,6 +89,7 @@ message AllocateResponseProto { repeated ContainerProto decreased_containers = 11; optional hadoop.common.TokenProto am_rm_token = 12; optional PriorityProto application_priority = 13; + optional string collector_addr = 14; } enum SchedulerResourceTypes { http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java new file mode 100644 index 0000000..3244bc3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.timelineservice; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; + +import org.junit.Test; + +public class TestTimelineMetric { + + @Test + public void testTimelineMetricAggregation() { + long ts = System.currentTimeMillis(); + // single_value metric add against null metric + TimelineMetric m1 = getSingleValueMetric("MEGA_BYTES_MILLIS", + TimelineMetricOperation.SUM, ts, 10000L); + TimelineMetric aggregatedMetric = TimelineMetric.aggregateTo(m1, null); + assertEquals(10000L, aggregatedMetric.getSingleDataValue()); + + TimelineMetric m2 = getSingleValueMetric("MEGA_BYTES_MILLIS", + TimelineMetricOperation.SUM, ts, 20000L); + aggregatedMetric = TimelineMetric.aggregateTo(m2, aggregatedMetric); + assertEquals(30000L, aggregatedMetric.getSingleDataValue()); + + // stateful sum test + Map<Object, Object> state = new HashMap<>(); + state.put(TimelineMetricOperation.PREV_METRIC_STATE_KEY, m2); + TimelineMetric m2New = getSingleValueMetric("MEGA_BYTES_MILLIS", + TimelineMetricOperation.SUM, ts, 10000L); + aggregatedMetric = TimelineMetric.aggregateTo(m2New, aggregatedMetric, + state); + assertEquals(20000L, aggregatedMetric.getSingleDataValue()); + + // single_value metric max against single_value metric + TimelineMetric m3 = getSingleValueMetric("TRANSFER_RATE", + TimelineMetricOperation.MAX, ts, 150L); + TimelineMetric aggregatedMax = TimelineMetric.aggregateTo(m3, null); + assertEquals(150L, aggregatedMax.getSingleDataValue()); + + TimelineMetric m4 = getSingleValueMetric("TRANSFER_RATE", + TimelineMetricOperation.MAX, ts, 170L); + aggregatedMax = TimelineMetric.aggregateTo(m4, aggregatedMax); + assertEquals(170L, aggregatedMax.getSingleDataValue()); + + // single_value metric avg against single_value metric + TimelineMetric m5 = getSingleValueMetric("TRANSFER_RATE", + TimelineMetricOperation.AVG, ts, 150L); + try { + TimelineMetric.aggregateTo(m5, null); + fail("Taking average among metrics is not supported! "); + } catch (UnsupportedOperationException e) { + // Expected + } + + } + + private static TimelineMetric getSingleValueMetric(String id, + TimelineMetricOperation op, long timestamp, long value) { + TimelineMetric m = new TimelineMetric(); + m.setId(id); + m.setType(Type.SINGLE_VALUE); + m.setRealtimeAggregationOp(op); + Map<Long, Number> metricValues = new HashMap<Long, Number>(); + metricValues.put(timestamp, value); + m.setValues(metricValues); + return m; + } + + private static TimelineMetric getTimeSeriesMetric(String id, + TimelineMetricOperation op, Map<Long, Number> metricValues) { + TimelineMetric m = new TimelineMetric(); + m.setId(id); + m.setType(Type.TIME_SERIES); + m.setRealtimeAggregationOp(op); + m.setValues(metricValues); + return m; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index 2c45b87..a4e5b0a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -101,6 +101,9 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase { // Ignore all YARN Application Timeline Service (version 1) properties configurationPrefixToSkipCompare.add("yarn.timeline-service."); + // skip deprecated RM_SYSTEM_METRICS_PUBLISHER_ENABLED + configurationPropsToSkipCompare + .add(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED); // Used as Java command line properties, not XML configurationPrefixToSkipCompare.add("yarn.app.container"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml index 2c1e36d..9dde71d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml @@ -70,6 +70,16 @@ <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-timelineservice</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> <!-- 'mvn dependency:analyze' fails to detect use of this dependency --> <dependency> <groupId>org.apache.hadoop</groupId> http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 380b822..b9949e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -102,7 +102,6 @@ import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.log4j.LogManager; @@ -219,6 +218,8 @@ public class ApplicationMaster { // Tracking url to which app master publishes info for clients to monitor private String appMasterTrackingUrl = ""; + private boolean timelineServiceV2 = false; + // App Master configuration // No. of containers to run shell command on @VisibleForTesting @@ -552,6 +553,14 @@ public class ApplicationMaster { cliParser.getOptionValue("container_max_retries", "0")); containrRetryInterval = Integer.parseInt(cliParser.getOptionValue( "container_retry_interval", "0")); + + if (YarnConfiguration.timelineServiceEnabled(conf)) { + timelineServiceV2 = YarnConfiguration.timelineServiceV2Enabled(conf); + } else { + timelineClient = null; + LOG.warn("Timeline service is not enabled"); + } + return true; } @@ -599,7 +608,6 @@ public class ApplicationMaster { UserGroupInformation.createRemoteUser(appSubmitterUserName); appSubmitterUgi.addCredentials(credentials); - AMRMClientAsync.AbstractCallbackHandler allocListener = new RMCallbackHandler(); amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); @@ -612,9 +620,18 @@ public class ApplicationMaster { nmClientAsync.start(); startTimelineClient(conf); + if (timelineServiceV2) { + // need to bind timelineClient + amRMClient.registerTimelineClient(timelineClient); + } if(timelineClient != null) { - publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), - DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); + if (timelineServiceV2) { + publishApplicationAttemptEventOnTimelineServiceV2( + DSEvent.DS_APP_ATTEMPT_START); + } else { + publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), + DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); + } } // Setup local RPC Server to accept status requests directly from clients @@ -684,10 +701,16 @@ public class ApplicationMaster { appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() { @Override public Void run() throws Exception { - if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { + if (YarnConfiguration.timelineServiceEnabled(conf)) { // Creating the Timeline Client - timelineClient = TimelineClient.createTimelineClient(); + if (timelineServiceV2) { + timelineClient = TimelineClient.createTimelineClient( + appAttemptID.getApplicationId()); + LOG.info("Timeline service V2 client is enabled"); + } else { + timelineClient = TimelineClient.createTimelineClient(); + LOG.info("Timeline service V1 client is enabled"); + } timelineClient.init(conf); timelineClient.start(); } else { @@ -717,9 +740,14 @@ public class ApplicationMaster { } catch (InterruptedException ex) {} } - if(timelineClient != null) { - publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), - DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); + if (timelineClient != null) { + if (timelineServiceV2) { + publishApplicationAttemptEventOnTimelineServiceV2( + DSEvent.DS_APP_ATTEMPT_END); + } else { + publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), + DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); + } } // Join all launched threads @@ -825,8 +853,12 @@ public class ApplicationMaster { + containerStatus.getContainerId()); } if(timelineClient != null) { - publishContainerEndEvent( - timelineClient, containerStatus, domainId, appSubmitterUgi); + if (timelineServiceV2) { + publishContainerEndEventOnTimelineServiceV2(containerStatus); + } else { + publishContainerEndEvent( + timelineClient, containerStatus, domainId, appSubmitterUgi); + } } } @@ -946,12 +978,18 @@ public class ApplicationMaster { } Container container = containers.get(containerId); if (container != null) { - applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); + applicationMaster.nmClientAsync.getContainerStatusAsync( + containerId, container.getNodeId()); } if(applicationMaster.timelineClient != null) { - applicationMaster.publishContainerStartEvent( - applicationMaster.timelineClient, container, - applicationMaster.domainId, applicationMaster.appSubmitterUgi); + if (applicationMaster.timelineServiceV2) { + applicationMaster.publishContainerStartEventOnTimelineServiceV2( + container); + } else { + applicationMaster.publishContainerStartEvent( + applicationMaster.timelineClient, container, + applicationMaster.domainId, applicationMaster.appSubmitterUgi); + } } } @@ -1259,7 +1297,7 @@ public class ApplicationMaster { LOG.error("App Attempt " + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") + " event could not be published for " - + appAttemptId.toString(), e); + + appAttemptID, e); } } @@ -1306,4 +1344,109 @@ public class ApplicationMaster { shellId); return new Thread(runnableLaunchContainer); } + + private void publishContainerStartEventOnTimelineServiceV2( + Container container) { + final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity + entity = + new org.apache.hadoop.yarn.api.records.timelineservice. + TimelineEntity(); + entity.setId(container.getId().toString()); + entity.setType(DSEntity.DS_CONTAINER.toString()); + long ts = System.currentTimeMillis(); + entity.setCreatedTime(ts); + entity.addInfo("user", appSubmitterUgi.getShortUserName()); + + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = + new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); + event.setTimestamp(ts); + event.setId(DSEvent.DS_CONTAINER_START.toString()); + event.addInfo("Node", container.getNodeId().toString()); + event.addInfo("Resources", container.getResource().toString()); + entity.addEvent(event); + + try { + appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() { + @Override + public TimelinePutResponse run() throws Exception { + timelineClient.putEntities(entity); + return null; + } + }); + } catch (Exception e) { + LOG.error("Container start event could not be published for " + + container.getId().toString(), + e instanceof UndeclaredThrowableException ? e.getCause() : e); + } + } + + private void publishContainerEndEventOnTimelineServiceV2( + final ContainerStatus container) { + final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity + entity = + new org.apache.hadoop.yarn.api.records.timelineservice. + TimelineEntity(); + entity.setId(container.getContainerId().toString()); + entity.setType(DSEntity.DS_CONTAINER.toString()); + //entity.setDomainId(domainId); + entity.addInfo("user", appSubmitterUgi.getShortUserName()); + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = + new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); + event.setTimestamp(System.currentTimeMillis()); + event.setId(DSEvent.DS_CONTAINER_END.toString()); + event.addInfo("State", container.getState().name()); + event.addInfo("Exit Status", container.getExitStatus()); + entity.addEvent(event); + + try { + appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() { + @Override + public TimelinePutResponse run() throws Exception { + timelineClient.putEntities(entity); + return null; + } + }); + } catch (Exception e) { + LOG.error("Container end event could not be published for " + + container.getContainerId().toString(), + e instanceof UndeclaredThrowableException ? e.getCause() : e); + } + } + + private void publishApplicationAttemptEventOnTimelineServiceV2( + DSEvent appEvent) { + final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity + entity = + new org.apache.hadoop.yarn.api.records.timelineservice. + TimelineEntity(); + entity.setId(appAttemptID.toString()); + entity.setType(DSEntity.DS_APP_ATTEMPT.toString()); + long ts = System.currentTimeMillis(); + if (appEvent == DSEvent.DS_APP_ATTEMPT_START) { + entity.setCreatedTime(ts); + } + entity.addInfo("user", appSubmitterUgi.getShortUserName()); + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = + new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); + event.setId(appEvent.toString()); + event.setTimestamp(ts); + entity.addEvent(event); + + try { + appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() { + @Override + public TimelinePutResponse run() throws Exception { + timelineClient.putEntitiesAsync(entity); + return null; + } + }); + } catch (Exception e) { + LOG.error("App Attempt " + + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") + + " event could not be published for " + + appAttemptID, + e instanceof UndeclaredThrowableException ? e.getCause() : e); + } + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org